/
Spark Scala - Read & Write files from MongoDB
Spark Scala - Read & Write files from MongoDB
Github Project : example-spark-scala-read-and-write-from-mongo
Common part
sbt Dependencies
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" % "provided" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" % "provided" libraryDependencies += "org.mongodb.spark" %% "mongo-spark-connector" % "2.1.0"
assembly Dependency
// In build.sbt import sbt.Keys._ assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false)
// In project/assembly.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1")
Mongo URI
Mongo URI are like that : mongodb://username:password@host:27017/database
Default port is 27017.
Init SparkContext and SQLContext
You need to define an input and and output collection depending to your needs.
These collection can be different.
To read from a collection, you need to choose a Partitioner that defines how documents are partitioned in your created dataframe
See the documentation for a list of all available Partitioners : https://docs.mongodb.com/spark-connector/configuration/
// Creation of SparkSession val sparkSession = SparkSession.builder() .appName("example-spark-scala-read-and-write-from-mongo") // Configuration for writing in a Mongo collection .config("spark.mongodb.output.uri", params.mongoUri) .config("spark.mongodb.output.collection", "restaurants") // Configuration for reading a Mongo collection .config("spark.mongodb.input.uri", params.mongoUri) .config("spark.mongodb.input.collection", "restaurants") // Type of Partitionner to use to transform Documents to dataframe .config("spark.mongodb.input.partitioner", "MongoPaginateByCountPartitioner") // Number of partitions in the resulting dataframe .config("spark.mongodb.input.partitionerOptions.MongoPaginateByCountPartitioner.numberOfPartitions", "1") .getOrCreate()
How to write a file to a Mongo collection with Spark Scala?
Code example
// Creating classes representing Documents case class Address(building: String,coord :Array[Double],street:String, zipcode: String) case class Restaurant(address: Address,borough :String,cuisine:String, name: String) // Creating a dataframe containing documents val dfRestaurants = Seq(Restaurant(Address("1480",Array(-73.9557413,40.7720266),"2 Avenue","10075"),"Manhattan","Italian","Vella"),Restaurant(Address("1007",Array(-73.856077,40.848447),"Morris Park Ave","10462"),"Bronx","Bakery","Morris Park Bake Shop")).toDF().coalesce(1) // Writing dataframe in Mongo collection MongoSpark.save(dfRestaurants.write.mode("overwrite")) logger.info("Writing documents in Mongo : OK")
How to read documents from a Mongo collection with Spark Scala ?
Code example
// Reading Mongodb collection into a dataframe val df = MongoSpark.load(sparkSession) logger.info(df.show()) logger.info("Reading documents from Mongo : OK")
, multiple selections available,
Related content
Spark Scala - Read & Write files from HDFS
Spark Scala - Read & Write files from HDFS
More like this
Spark Scala - Read & Write files from Hive
Spark Scala - Read & Write files from Hive
More like this
Pyspark - Read & Write files from HDFS
Pyspark - Read & Write files from HDFS
More like this
Pyspark - Read & Write files from Hive
Pyspark - Read & Write files from Hive
More like this
Apache Spark
Apache Spark
More like this
R - Read & Write files from MongoDB
R - Read & Write files from MongoDB
More like this