Spark Streaming - Twitter
Github Project :Â https://github.com/saagie/example-spark-streaming-twitter
Common part
application.properties
// Set your own identifiants twitter4j.oauth.consumerKey= twitter4j.oauth.consumerSecret= twitter4j.oauth.accessToken= twitter4j.oauth.accessTokenSecret=
assembly Dependency
// In project/assembly.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
sbt dependencies
import sbt.Keys._ name := "spark-streaming-twitter" version := "1.0" scalaVersion := "2.10.5" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "1.6.1" % "provided", "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided", "org.apache.spark" %% "spark-streaming-twitter" % "1.6.1", "org.apache.commons" % "commons-lang3" % "3.0", "org.slf4j" % "slf4j-api" % "1.7.2", "com.typesafe" % "config" % "1.2.1" ) resolvers += "Akka Repository" at "http://repo.akka.io/releases/" assemblyMergeStrategy in assembly := { case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } fork := true
How to load config from application.properties?
Code example
val config = ConfigFactory.load() val consumerKey = config.getString("twitter4j.oauth.consumerKey") val consumerSecret = config.getString("twitter4j.oauth.consumerSecret") val accessToken = config.getString("twitter4j.oauth.accessToken") val accessTokenSecret = config.getString("twitter4j.oauth.accessTokenSecret") // Set the system properties so that Twitter4j library used by twitter stream // can use them to generat OAuth credentials System.setProperty("twitter4j.oauth.consumerKey", consumerKey) System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret) System.setProperty("twitter4j.oauth.accessToken", accessToken) System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
Â
How to initiate stream for twitter?
Code example
// To use twitter api you must supply filters val filters = "test" val sparkConf = new SparkConf().setAppName("TwitterPopularTags").set("spark.executor.memory", "512M").set("spark.executor.cores","2") val ssc = new StreamingContext(sparkConf, Seconds(10)) val stream = TwitterUtils.createStream(ssc, None, filters)  // Put your code here  // Launch the streaming ssc.start() ssc.awaitTermination()
How get popular topics in last 30 seconds, log in console and save in HDFS?
Code example
val topHashTags = hashTags.map((_,1)) .reduceByKeyAndWindow(_ + _, Seconds(30)) .map(_.swap) .transform( _.sortByKey(false) .zipWithIndex.filter(_._2 < 11).map(_._1) ) .map(_.swap) Â topHashTags.foreachRDD((rdd, ts) => { if (!rdd.partitions.isEmpty) { logger.info("Popular topics in last 30 seconds (%s total):".format(rdd.count())) rdd.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) } } }) Â topHashTags.saveAsTextFiles(hdfsMaster + "user/hdfs/wiki/topHashTags") Â