Spark Streaming - Kafka

Github Project : https://github.com/saagie/example-spark-streaming-kafka


SBT dependencies
import sbt.Keys._

name := "example-spark-streaming-kafka"

version := "1.0"

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % "2.1.0",
  "org.apache.spark" %% "spark-streaming" % "2.1.0",
  "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0",
  "com.github.scopt" %% "scopt" % "3.6.0"
)

assemblyMergeStrategy in assembly := {
  case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first
  case x =>
    val oldStrategy = (assemblyMergeStrategy in assembly).value
    oldStrategy(x)
}

mainClass in assembly := Some("io.saagie.kafka.Kafka")


These are kind of the default parameters to use Kafka. The params.kafkaBootstrapServer is the kafka bootstrap servers's hostname and port.

Kafka parameters
val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> params.kafkaBootstrapServer,
  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> "kafka example",
  "auto.offset.reset" -> "earliest",
  "enable.auto.commit" -> (false: java.lang.Boolean)
)


Start streaming
val sparkConf = new SparkConf()
  .setAppName("kafka example")
val streamingContext = new StreamingContext(sparkConf, Seconds(10))
   	
val stream = KafkaUtils.createDirectStream[String, String](
  streamingContext,
  PreferConsistent,
  Subscribe[String, String](params.kafkaTopics, kafkaParams)
)
val rdds = stream.map(r => (r.key, r.value()))
 rdds.foreachRDD { rdd =>
  if (!rdd.isEmpty()) {
    rdd.foreach(r => logger.info("key: %s, value %s".format(r._1, r._2)))
  }
}
rdds.saveAsTextFiles(params.hdfsHost + "/user/hdfs/kafka")

streamingContext.start()
streamingContext.awaitTermination()