Spark Scala - Read & Write files from MongoDB

Github Project : example-spark-scala-read-and-write-from-mongo

Common part

sbt Dependencies

libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" % "provided"
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" % "provided"
libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.1.0"

assembly Dependency

// In build.sbt
import sbt.Keys._
assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// In project/assembly.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")

Mongo URI

Mongo URI are like that : mongodb://username:password@host:27017/database

Default port is 27017.

Init SparkContext and SQLContext

You need to define an input and and output collection depending to your needs.

These collection can be different.

To read from a collection, you need to choose a Partitioner that defines how documents are partitioned in your created dataframe

See the documentation for a list of all available Partitioners : https://docs.mongodb.com/spark-connector/configuration/

// Creation of SparkSession
val sparkSession = SparkSession.builder()
  .appName("example-spark-scala-read-and-write-from-mongo")
  // Configuration for writing in a Mongo collection
  .config("spark.mongodb.output.uri", params.mongoUri)
  .config("spark.mongodb.output.collection", "restaurants")
  // Configuration for reading a Mongo collection
  .config("spark.mongodb.input.uri", params.mongoUri)
  .config("spark.mongodb.input.collection", "restaurants")
  // Type of Partitionner to use to transform Documents to dataframe
  .config("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner")
  // Number of partitions in the resulting dataframe
  .config("spark.mongodb.input.partitionerOptions.MongoPaginateByCountPartitioner.numberOfPartitions", "1")
  .getOrCreate()

How to write a file to a Mongo collection with Spark Scala?

Code example

// Creating classes representing Documents
case class Address(building: String,coord :Array[Double],street:String, zipcode: String)
case class Restaurant(address: Address,borough :String,cuisine:String, name: String)
 
// Creating a dataframe containing documents
val dfRestaurants = Seq(Restaurant(Address("1480",Array(-73.9557413,40.7720266),"2 Avenue","10075"),"Manhattan","Italian","Vella"),Restaurant(Address("1007",Array(-73.856077,40.848447),"Morris Park Ave","10462"),"Bronx","Bakery","Morris Park Bake Shop")).toDF().coalesce(1)

// Writing dataframe in Mongo collection
MongoSpark.save(dfRestaurants.write.mode("overwrite"))
logger.info("Writing documents in Mongo : OK")

How to read documents from a Mongo collection with Spark Scala ?

Code example

// Reading Mongodb collection into a dataframe
val df = MongoSpark.load(sparkSession)
logger.info(df.show())
logger.info("Reading documents from Mongo : OK")