Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Github Project : example-spark-scala-read-and-write-from-mongo

Common part

sbt Dependencies

Code Block
languagescala
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided"
libraryDependencies += "com.databricks" %% "spark-csv" % "1.3.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.0.0"

assembly Dependency

Code Block
languagescala
// In build.sbt
import sbt.Keys._
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
Code Block
languagescala
// In project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

Mongo URI

Mongo URI are like that : mongodb://username:password@host:27017/database

Default port is 27017.

Init SparkContext and SQLContext

You need to define an input and and output collection depending to your needs.

...

Code Block
languagescala
val conf = { new SparkConf()
  .setAppName("example-spark-scala-read-and-write-from-mongo")
  // Configuration for writing in a Mongo collection
  .set("spark.mongodb.output.uri", params.mongoUri)
  .set("spark.mongodb.output.collection","restaurants")
  // Configuration for reading a Mongo collection
  .set("spark.mongodb.input.uri", params.mongoUri)
  .set("spark.mongodb.input.collection","restaurants")
  // Type of Partitionner to use to transform Documents to dataframe
  .set("spark.mongodb.input.partitioner","MongoPaginateByCountPartitioner")
  // Number of partitions in the resulting dataframe
  .set("spark.mongodb.input.partitionerOptions.MongoPaginateByCountPartitioner.numberOfPartitions","1")}

How to write a file to a Mongo collection with Spark Scala?

Code example

Code Block
languagejava
// Creating classes representing Documents
case class Address(building: String,coord :Array[Double],street:String, zipcode: String)
case class Restaurant(address: Address,borough :String,cuisine:String, name: String)
 
// Creating a dataframe containing documents
val dfRestaurants = Seq(Restaurant(Address("1480",Array(-73.9557413,40.7720266),"2 Avenue","10075"),"Manhattan","Italian","Vella"),Restaurant(Address("1007",Array(-73.856077,40.848447),"Morris Park Ave","10462"),"Bronx","Bakery","Morris Park Bake Shop")).toDF().coalesce(1)

// Writing dataframe in Mongo collection
MongoSpark.save(dfRestaurants.write.mode("overwrite"))
logger.info("Writing documents in Mongo : OK")

How to read documents from a Mongo collection with Spark Scala ?

Code example

Code Block
languagejava
// Reading Mongodb collection into a dataframe
val df = MongoSpark.load(sqlContext)
logger.info(df.show())
logger.info("Reading documents from Mongo : OK")