Spark Scala - Read & Write files from MongoDB
Github Project : example-spark-scala-read-and-write-from-mongo
Common part
sbt Dependencies
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" % "provided" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" % "provided" libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.1.0"
assembly Dependency
// In build.sbt import sbt.Keys._ assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// In project/assembly.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
Mongo URI
Mongo URI are like that : mongodb://username:password@host:27017/database
Default port is 27017.
Init SparkContext and SQLContext
You need to define an input and and output collection depending to your needs.
These collection can be different.
To read from a collection, you need to choose a Partitioner that defines how documents are partitioned in your created dataframe
See the documentation for a list of all available Partitioners :Â https://docs.mongodb.com/spark-connector/configuration/
// Creation of SparkSession val sparkSession = SparkSession.builder() .appName("example-spark-scala-read-and-write-from-mongo") // Configuration for writing in a Mongo collection .config("spark.mongodb.output.uri", params.mongoUri) .config("spark.mongodb.output.collection", "restaurants") // Configuration for reading a Mongo collection .config("spark.mongodb.input.uri", params.mongoUri) .config("spark.mongodb.input.collection", "restaurants") // Type of Partitionner to use to transform Documents to dataframe .config("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner") // Number of partitions in the resulting dataframe .config("spark.mongodb.input.partitionerOptions.MongoPaginateByCountPartitioner.numberOfPartitions", "1") .getOrCreate()
How to write a file to a Mongo collection with Spark Scala?
Code example
// Creating classes representing Documents case class Address(building: String,coord :Array[Double],street:String, zipcode: String) case class Restaurant(address: Address,borough :String,cuisine:String, name: String) Â // Creating a dataframe containing documents val dfRestaurants = Seq(Restaurant(Address("1480",Array(-73.9557413,40.7720266),"2 Avenue","10075"),"Manhattan","Italian","Vella"),Restaurant(Address("1007",Array(-73.856077,40.848447),"Morris Park Ave","10462"),"Bronx","Bakery","Morris Park Bake Shop")).toDF().coalesce(1) // Writing dataframe in Mongo collection MongoSpark.save(dfRestaurants.write.mode("overwrite")) logger.info("Writing documents in Mongo : OK")
How to read documents from a Mongo collection with Spark Scala ?
Code example
// Reading Mongodb collection into a dataframe val df = MongoSpark.load(sparkSession) logger.info(df.show()) logger.info("Reading documents from Mongo : OK")