Github project : https://github.com/saagie/example-R-read-and-write-from-hdfs
This code is based on the official documentation for the webhdfs API, available at this address : https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html
httr : https://cran.r-project.org/web/packages/httr/index.html
Used to execute Curl request in the write function.
libcurl4-openssl-dev : https://packages.debian.org/fr/sid/libcurl4-openssl-dev
Dependency for the httr package.
hdfsUri : Url used to communicate with WebHdfs. The format is the following : http://namenodedns:port/webhdfs/v1
fileUri : Path of the file to read, or where to write the file, e.g. /user/username/myfile.csv
optionnalParameters : Optional parameters to send to the API, with the following format &name1=value1&name2=value2. The list of all options is available on the API documentation linked above.
Create the full url from all the parameters. The full url looks like this :
http://namenodedns:port/webhdfs/v1/user/username/myfile.csv?user.name=MYUSERNAME&op=OPEN
# Concatenate all the parameters into one uri uri <- paste0(hdfsUri, fileUri, readParameter, usernameParameter, optionnalParameters) |
Most R function used to read data are also able to read from a connection instead of a file.
# Read your file with the function you want as long as it supports reading from a connection data <- read.csv(uri) |
To download a zip folder or other non-data files, the method download.file is needed.
download.file('http://namenodedns:port/webhdfs/v1/user/username/myfolder.tar.gz?op=OPEN', destfile = 'localFile.tar.gz') |
Install the dependencies (including system) if necessary.
# Choose a cran mirror to use (48 = Paris / France) chooseCRANmirror(ind = 48) # Install dependencies if necessary if (!require(httr)) { # System dependency for the curl package # system("sudo apt-get install -y libcurl4-openssl-dev") install.packages("httr") library(httr) } |
Create the full url from all the parameters. The full url looks like this :
http://namenodedns:port/webhdfs/v1/user/username/myfile.csv?user.name=MYUSERNAME&op=CREATE&overwrite=true
# Concatenate all the parameters into one uri uri <- paste0(hdfsUri, fileUri, usernameParameter, writeParameter, optionnalParameters) |
Writing a file is done in two steps. First, a curl request is done on the namenode, to know on which datanode the file must be stored. The namenode returns the adress of the datanode concerned. Then, the file must be uploaded on the address returned by the namenode.
# Ask the namenode on which datanode to write the file response <- PUT(uri) # Get the url of the datanode returned by hdfs uriWrite <- response$url |
In order to be uploaded, the data must be written on the disk. If your data is already on the disk leave it that way, else you can write it with your favorite function, e.g.
write.csv(data, row.names = F, file = "tmp.csv") |
Others methods have been tested, like converting the object in RAM, writing in a connection in RAM, but for some reason, writing on the disk is faster.
Once the file is written, it needs to be uploaded.
# Upload the file with a PUT request responseWrite <- PUT(uriWrite, body = upload_file("tmp.csv")) |
library(tidyverse) library(httr) filelist <- # LISTSTATUS is the webHDFS equivalent of `$ ls` paste0(hdfsUri, dirUri, "?op=LISTSTATUS") %>% GET() %>% content(type = "application/json") %>% pluck(1, 1) %>% map_dfr(as_tibble) glimpse(filelist) # Observations: 16 # Variables: 13 # $ accessTime <dbl> 1.527669e+12, 1.527588e+12, 1.527588e+12, 1.527588... # $ blockSize <int> 134217728, 134217728, 134217728, 134217728, 134217... # $ childrenNum <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 # $ fileId <int> 878668, 878659, 878669, 878672, 878670, 878663, 87... # $ group <chr> "foo.bar", "foo.bar", "foo.bar", "f... # $ length <int> 6590121, 37314676, 2174591, 7325908, 4599335, 1677... # $ modificationTime <dbl> 1.527582e+12, 1.527582e+12, 1.527582e+12, 1.527582... # $ owner <chr> "foo.bar", "foo.bar", "foo.bar", "f... # $ pathSuffix <chr> "baz_qux_20180524.TXT", "baz_qux_20180525.TXT", ... # $ permission <chr> "755", "755", "755", "755", "755", "755", "755", "... # $ replication <int> 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3 # $ storagePolicy <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 # $ type <chr> "FILE", "FILE", "FILE", "FILE", "FILE", "FILE", "F... filelist %>% # "pathSuffix" is the actual file names pull("pathSuffix") # [1] "baz_qux_20180524.TXT" "baz_qux_20180525.TXT" # [3] ... |