Github Project : example-spark-scala-read-and-write-from-hdfs
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.1.0" % "provided" libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.1.0" % "provided" |
// In build.sbt import sbt.Keys._ assemblyOption in assembly := (assemblyOption in assembly).value.copy(includeScala = false) |
// In project/assembly.sbt addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.1") |
HDFS URI are like that : hdfs://namenodedns:port/user/hdfs/folder/file.csv
Default port is 8020.
val sparkSession = SparkSession.builder().appName("example-spark-scala-read-and-write-from-hdfs").getOrCreate() |
// Defining an Helloworld class case class HelloWorld(message: String) // ====== Creating a dataframe with 1 partition val df = Seq(HelloWorld("helloworld")).toDF().coalesce(1) // ======= Writing files // Writing Dataframe as parquet file df.write.mode(SaveMode.Overwrite).parquet(hdfs_master + "user/hdfs/wiki/testwiki") // Writing Dataframe as csv file df.write.mode(SaveMode.Overwrite).csv(hdfs_master + "user/hdfs/wiki/testwiki.csv") |
// ======= Reading files // Reading parquet files into a Spark Dataframe val df_parquet = session.read.parquet(hdfs_master + "user/hdfs/wiki/testwiki") // Reading csv files into a Spark Dataframe val df_csv = sparkSession.read.option("inferSchema", "true").csv(hdfs_master + "user/hdfs/wiki/testwiki.csv") |