R - Read & Write files from HDFS
Github project : https://github.com/saagie/example-R-read-and-write-from-hdfs
Official API Documentation
This code is based on the official documentation for the webhdfs API, available at this address : https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
Dependencies
R package
httr : https://cran.r-project.org/web/packages/httr/index.html
Used to execute Curl request in the write function.
Linux package
libcurl4-openssl-dev : https://packages.debian.org/fr/sid/libcurl4-openssl-dev
Dependency for the httr package.
Parameters
hdfsUri : Url used to communicate with WebHdfs. The format is the following : http://namenodedns:port/webhdfs/v1
fileUri : Path of the file to read, or where to write the file, e.g. /user/username/myfile.csv
optionnalParameters : Optional parameters to send to the API, with the following format &name1=value1&name2=value2. The list of all options is available on the API documentation linked above.
Code explanation for reading a file
Url creation
Create the full url from all the parameters. The full url looks like this :
http://namenodedns:port/webhdfs/v1/user/username/myfile.csv?user.name=MYUSERNAME&op=OPEN
# Concatenate all the parameters into one uri uri <- paste0(hdfsUri, fileUri, readParameter, usernameParameter, optionnalParameters)
Reading data
Most R function used to read data are also able to read from a connection instead of a file.
# Read your file with the function you want as long as it supports reading from a connection data <- read.csv(uri)
Reading non-data file
To download a zip folder or other non-data files, the method download.file is needed.
download.file('http://namenodedns:port/webhdfs/v1/user/username/myfolder.tar.gz?op=OPEN', destfile = 'localFile.tar.gz')
Code explanation for writing a file
Dependencies
Install the dependencies (including system) if necessary.
# Choose a cran mirror to use (48 = Paris / France) chooseCRANmirror(ind = 48) # Install dependencies if necessary if (!require(httr)) { # System dependency for the curl package # system("sudo apt-get install -y libcurl4-openssl-dev") install.packages("httr") library(httr) }
Url creation
Create the full url from all the parameters. The full url looks like this :
http://namenodedns:port/webhdfs/v1/user/username/myfile.csv?user.name=MYUSERNAME&op=CREATE&overwrite=true
# Concatenate all the parameters into one uri uri <- paste0(hdfsUri, fileUri, usernameParameter, writeParameter, optionnalParameters)
Ask the namenode where to write the file
Writing a file is done in two steps. First, a curl request is done on the namenode, to know on which datanode the file must be stored. The namenode returns the adress of the datanode concerned. Then, the file must be uploaded on the address returned by the namenode.
# Ask the namenode on which datanode to write the file response <- PUT(uri) # Get the url of the datanode returned by hdfs uriWrite <- response$url
Uploading a file
In order to be uploaded, the data must be written on the disk. If your data is already on the disk leave it that way, else you can write it with your favorite function, e.g.
write.csv(data, row.names = F, file = "tmp.csv")
Others methods have been tested, like converting the object in RAM, writing in a connection in RAM, but for some reason, writing on the disk is faster.
Once the file is written, it needs to be uploaded.
# Upload the file with a PUT request responseWrite <- PUT(uriWrite, body = upload_file("tmp.csv"))
List files in a directory
library(tidyverse) library(httr) filelist <- # LISTSTATUS is the webHDFS equivalent of `$ ls` paste0(hdfsUri, dirUri, "?op=LISTSTATUS") %>% GET() %>% content(type = "application/json") %>% pluck(1, 1) %>% map_dfr(as_tibble) glimpse(filelist) # Observations: 16 # Variables: 13 # $ accessTime <dbl> 1.527669e+12, 1.527588e+12, 1.527588e+12, 1.527588... # $ blockSize <int> 134217728, 134217728, 134217728, 134217728, 134217... # $ childrenNum <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 # $ fileId <int> 878668, 878659, 878669, 878672, 878670, 878663, 87... # $ group <chr> "foo.bar", "foo.bar", "foo.bar", "f... # $ length <int> 6590121, 37314676, 2174591, 7325908, 4599335, 1677... # $ modificationTime <dbl> 1.527582e+12, 1.527582e+12, 1.527582e+12, 1.527582... # $ owner <chr> "foo.bar", "foo.bar", "foo.bar", "f... # $ pathSuffix <chr> "baz_qux_20180524.TXT", "baz_qux_20180525.TXT", ... # $ permission <chr> "755", "755", "755", "755", "755", "755", "755", "... # $ replication <int> 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 # $ storagePolicy <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 # $ type <chr> "FILE", "FILE", "FILE", "FILE", "FILE", "FILE", "F... filelist %>% # "pathSuffix" is the actual file names pull("pathSuffix") # [1] "baz_qux_20180524.TXT" "baz_qux_20180525.TXT" # [3] ...