Github Project : example-spark-scala-read-and-write-from-mongo
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" % "provided" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" % "provided" libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.1.0" |
// In build.sbt import sbt.Keys._ assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) |
// In project/assembly.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") |
Mongo URI are like that : mongodb://username:password@host:27017/database
Default port is 27017.
You need to define an input and and output collection depending to your needs.
These collection can be different.
To read from a collection, you need to choose a Partitioner that defines how documents are partitioned in your created dataframe
See the documentation for a list of all available Partitioners : https://docs.mongodb.com/spark-connector/configuration/
// Creation of SparkSession val sparkSession = SparkSession.builder() .appName("example-spark-scala-read-and-write-from-mongo") // Configuration for writing in a Mongo collection .config("spark.mongodb.output.uri", params.mongoUri) .config("spark.mongodb.output.collection", "restaurants") // Configuration for reading a Mongo collection .config("spark.mongodb.input.uri", params.mongoUri) .config("spark.mongodb.input.collection", "restaurants") // Type of Partitionner to use to transform Documents to dataframe .config("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner") // Number of partitions in the resulting dataframe .config("spark.mongodb.input.partitionerOptions.MongoPaginateByCountPartitioner.numberOfPartitions", "1") .getOrCreate() |
// Creating classes representing Documents case class Address(building: String,coord :Array[Double],street:String, zipcode: String) case class Restaurant(address: Address,borough :String,cuisine:String, name: String) // Creating a dataframe containing documents val dfRestaurants = Seq(Restaurant(Address("1480",Array(-73.9557413,40.7720266),"2 Avenue","10075"),"Manhattan","Italian","Vella"),Restaurant(Address("1007",Array(-73.856077,40.848447),"Morris Park Ave","10462"),"Bronx","Bakery","Morris Park Bake Shop")).toDF().coalesce(1) // Writing dataframe in Mongo collection MongoSpark.save(dfRestaurants.write.mode("overwrite")) logger.info("Writing documents in Mongo : OK") |
// Reading Mongodb collection into a dataframe val df = MongoSpark.load(sparkSession) logger.info(df.show()) logger.info("Reading documents from Mongo : OK") |