Spark Streaming - Kafka
Github Project :Â https://github.com/saagie/example-spark-streaming-kafka
SBT dependencies
import sbt.Keys._ name := "example-spark-streaming-kafka" version := "1.0" scalaVersion := "2.11.11" libraryDependencies ++= Seq( "org.apache.spark" %% "spark-core" % "2.1.0", "org.apache.spark" %% "spark-streaming" % "2.1.0", "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.1.0", "com.github.scopt" %% "scopt" % "3.6.0" ) assemblyMergeStrategy in assembly := { case PathList("org", "apache", "spark", "unused", "UnusedStubClass.class") => MergeStrategy.first case x => val oldStrategy = (assemblyMergeStrategy in assembly).value oldStrategy(x) } mainClass in assembly := Some("io.saagie.kafka.Kafka")
These are kind of the default parameters to use Kafka. The params.kafkaBootstrapServer is the kafka bootstrap servers's hostname and port.
Kafka parameters
val kafkaParams = Map[String, Object]( "bootstrap.servers" -> params.kafkaBootstrapServer, "key.deserializer" -> classOf[StringDeserializer], "value.deserializer" -> classOf[StringDeserializer], "group.id" -> "kafka example", "auto.offset.reset" -> "earliest", "enable.auto.commit" -> (false: java.lang.Boolean) )
Start streaming
val sparkConf = new SparkConf() .setAppName("kafka example") val streamingContext = new StreamingContext(sparkConf, Seconds(10)) val stream = KafkaUtils.createDirectStream[String, String]( streamingContext, PreferConsistent, Subscribe[String, String](params.kafkaTopics, kafkaParams) ) val rdds = stream.map(r => (r.key, r.value())) rdds.foreachRDD { rdd => if (!rdd.isEmpty()) { rdd.foreach(r => logger.info("key: %s, value %s".format(r._1, r._2))) } } rdds.saveAsTextFiles(params.hdfsHost + "/user/hdfs/kafka") streamingContext.start() streamingContext.awaitTermination()