...
Github Project : example-spark-scala-read-and-write-from-mongo
Common part
sbt Dependencies
Code Block |
---|
|
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "1.6.1" % "provided"
libraryDependencies += "com.databricks" %% "spark-csv" % "1.3.0"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "1.0.0" |
assembly Dependency
Code Block |
---|
|
// In build.sbt
import sbt.Keys._
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) |
Code Block |
---|
|
// In project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") |
Mongo URI
Mongo URI are like that : mongodb://username:password@host:27017/database
Default port is 27017.
Init SparkContext and SQLContext
You need to define an input and and output collection depending to your needs.
...
Code Block |
---|
|
val conf = { new SparkConf()
.setAppName("example-spark-scala-read-and-write-from-mongo")
// Configuration for writing in a Mongo collection
.set("spark.mongodb.output.uri", params.mongoUri)
.set("spark.mongodb.output.collection","restaurants")
// Configuration for reading a Mongo collection
.set("spark.mongodb.input.uri", params.mongoUri)
.set("spark.mongodb.input.collection","restaurants")
// Type of Partitionner to use to transform Documents to dataframe
.set("spark.mongodb.input.partitioner","MongoPaginateByCountPartitioner")
// Number of partitions in the resulting dataframe
.set("spark.mongodb.input.partitionerOptions.MongoPaginateByCountPartitioner.numberOfPartitions","1")} |
How to write a file to a Mongo collection with Spark Scala?
Code example
Code Block |
---|
|
// Creating classes representing Documents
case class Address(building: String,coord :Array[Double],street:String, zipcode: String)
case class Restaurant(address: Address,borough :String,cuisine:String, name: String)
// Creating a dataframe containing documents
val dfRestaurants = Seq(Restaurant(Address("1480",Array(-73.9557413,40.7720266),"2 Avenue","10075"),"Manhattan","Italian","Vella"),Restaurant(Address("1007",Array(-73.856077,40.848447),"Morris Park Ave","10462"),"Bronx","Bakery","Morris Park Bake Shop")).toDF().coalesce(1)
// Writing dataframe in Mongo collection
MongoSpark.save(dfRestaurants.write.mode("overwrite"))
logger.info("Writing documents in Mongo : OK") |
How to read documents from a Mongo collection with Spark Scala ?
Code example
Code Block |
---|
|
// Reading Mongodb collection into a dataframe
val df = MongoSpark.load(sqlContext)
logger.info(df.show())
logger.info("Reading documents from Mongo : OK") |