Spark Streaming - Twitter
Github Project : https://github.com/saagie/example-spark-streaming-twitter
Common part
application.properties
// Set your own identifiants twitter4j.oauth.consumerKey= twitter4j.oauth.consumerSecret= twitter4j.oauth.accessToken= twitter4j.oauth.accessTokenSecret=
assembly Dependency
// In project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
sbt dependencies
import sbt.Keys._
name := "spark-streaming-twitter"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
"org.apache.spark" %% "spark-streaming-twitter" % "1.6.1",
"org.apache.commons" % "commons-lang3" % "3.0",
"org.slf4j" % "slf4j-api" % "1.7.2",
"com.typesafe" % "config" % "1.2.1"
)
resolvers += "Akka Repository" at "http://repo.akka.io/releases/"
assemblyMergeStrategy in assembly := {
case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") =>
MergeStrategy.first
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}
fork := true
How to load config from application.properties?
Code example
val config = ConfigFactory.load()
val consumerKey = config.getString("twitter4j.oauth.consumerKey")
val consumerSecret = config.getString("twitter4j.oauth.consumerSecret")
val accessToken = config.getString("twitter4j.oauth.accessToken")
val accessTokenSecret = config.getString("twitter4j.oauth.accessTokenSecret")
// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
How to initiate stream for twitter?
Code example
// To use twitter api you must supply filters
val filters = "test"
val sparkConf = new SparkConf().setAppName("TwitterPopularTags").set("spark.executor.memory", "512M").set("spark.executor.cores","2")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val stream = TwitterUtils.createStream(ssc, None, filters)
// Put your code here
// Launch the streaming
ssc.start()
ssc.awaitTermination()
How get popular topics in last 30 seconds, log in console and save in HDFS?
Code example
val topHashTags = hashTags.map((_,1))
.reduceByKeyAndWindow(_ + _, Seconds(30))
.map(_.swap)
.transform(
_.sortByKey(false)
.zipWithIndex.filter(_._2 < 11).map(_._1)
)
.map(_.swap)
topHashTags.foreachRDD((rdd, ts) => {
if (!rdd.partitions.isEmpty) {
logger.info("Popular topics in last 30 seconds (%s total):".format(rdd.count()))
rdd.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
}
})
topHashTags.saveAsTextFiles(hdfsMaster + "user/hdfs/wiki/topHashTags")
, multiple selections available,
