Spark Streaming - Twitter

Github Project : https://github.com/saagie/example-spark-streaming-twitter

Common part

application.properties

// Set your own identifiants
twitter4j.oauth.consumerKey=
twitter4j.oauth.consumerSecret=
twitter4j.oauth.accessToken=
twitter4j.oauth.accessTokenSecret=

assembly Dependency

// In project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

sbt dependencies

import sbt.Keys._

name := "spark-streaming-twitter"

version := "1.0"

scalaVersion := "2.10.5"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "1.6.1" % "provided",
  "org.apache.spark" %% "spark-streaming" % "1.6.1" % "provided",
  "org.apache.spark" %% "spark-streaming-twitter" % "1.6.1",
  "org.apache.commons" % "commons-lang3" % "3.0",
  "org.slf4j" % "slf4j-api" % "1.7.2",
  "com.typesafe" % "config" % "1.2.1"
)

resolvers += "Akka Repository" at "http://repo.akka.io/releases/"

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") =>
    MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

fork := true

How to load config from application.properties?

Code example

val config = ConfigFactory.load()
val consumerKey = config.getString("twitter4j.oauth.consumerKey")
val consumerSecret = config.getString("twitter4j.oauth.consumerSecret")
val accessToken = config.getString("twitter4j.oauth.accessToken")
val accessTokenSecret = config.getString("twitter4j.oauth.accessTokenSecret")

// Set the system properties so that Twitter4j library used by twitter stream
// can use them to generat OAuth credentials
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)

 

How to initiate stream for twitter?

Code example

// To use twitter api you must supply filters
val filters = "test"
val sparkConf = new SparkConf().setAppName("TwitterPopularTags").set("spark.executor.memory", "512M").set("spark.executor.cores","2")
val ssc = new StreamingContext(sparkConf, Seconds(10))
val stream = TwitterUtils.createStream(ssc, None, filters)
 
// Put your code here
 
// Launch the streaming
ssc.start()
ssc.awaitTermination()


How get popular topics in last 30 seconds, log in console and save in HDFS?

Code example

val topHashTags = hashTags.map((_,1))
  .reduceByKeyAndWindow(_ + _, Seconds(30))
  .map(_.swap)
  .transform(
    _.sortByKey(false)
    .zipWithIndex.filter(_._2 < 11).map(_._1)
  )
  .map(_.swap)
 
topHashTags.foreachRDD((rdd, ts) => {
  if (!rdd.partitions.isEmpty) {
    logger.info("Popular topics in last 30 seconds (%s total):".format(rdd.count()))
    rdd.foreach { case (count, tag) => println("%s (%s tweets)".format(tag, count)) }
  }
})
 
topHashTags.saveAsTextFiles(hdfsMaster + "user/hdfs/wiki/topHashTags")