R - Read & Write files from HDFS


Github project : https://github.com/saagie/example-R-read-and-write-from-hdfs


Official API Documentation

This code is based on the official documentation for the webhdfs API, available at this address : https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/WebHDFS.html

Dependencies

R package

httr : https://cran.r-project.org/web/packages/httr/index.html

Used to execute Curl request in the write function.

Linux package

libcurl4-openssl-dev : https://packages.debian.org/fr/sid/libcurl4-openssl-dev

Dependency for the httr package.

Parameters

hdfsUri : Url used to communicate with WebHdfs. The format is the following : http://namenodedns:port/webhdfs/v1

fileUri : Path of the file to read, or where to write the file, e.g. /user/username/myfile.csv

optionnalParameters : Optional parameters to send to the API, with the following format &name1=value1&name2=value2. The list of all options is available on the API documentation linked above.

Code explanation for reading a file

Url creation

Create the full url from all the parameters. The full url looks like this : 

http://namenodedns:port/webhdfs/v1/user/username/myfile.csv?user.name=MYUSERNAME&op=OPEN
# Concatenate all the parameters into one uri
uri <- paste0(hdfsUri, fileUri, readParameter, usernameParameter, optionnalParameters)

Reading data

Most R function used to read data are also able to read from a connection instead of a file. 

# Read your file with the function you want as long as it supports reading from a connection
data <- read.csv(uri)

Reading non-data file

To download a zip folder or other non-data files, the method download.file is needed.

download.file('http://namenodedns:port/webhdfs/v1/user/username/myfolder.tar.gz?op=OPEN', destfile = 'localFile.tar.gz')


Code explanation for writing a file

Dependencies

Install the dependencies (including system) if necessary.

# Choose a cran mirror to use (48 = Paris / France)
chooseCRANmirror(ind = 48)
# Install dependencies if necessary
if (!require(httr)) {
	# System dependency for the curl package
	# system("sudo apt-get install -y libcurl4-openssl-dev")
	install.packages("httr")
	library(httr)
}

Url creation

Create the full url from all the parameters. The full url looks like this : 

http://namenodedns:port/webhdfs/v1/user/username/myfile.csv?user.name=MYUSERNAME&op=CREATE&overwrite=true
# Concatenate all the parameters into one uri
uri <- paste0(hdfsUri, fileUri, usernameParameter, writeParameter, optionnalParameters)

Ask the namenode where to write the file

Writing a file is done in two steps. First, a curl request is done on the namenode, to know on which datanode the file must be stored. The namenode returns the adress of the datanode concerned. Then, the file must be uploaded on the address returned by the namenode.

# Ask the namenode on which datanode to write the file
response <- PUT(uri)
# Get the url of the datanode returned by hdfs
uriWrite <- response$url

Uploading a file

In order to be uploaded, the data must be written on the disk. If your data is already on the disk leave it that way, else you can write it with your favorite function, e.g.

write.csv(data, row.names = F, file = "tmp.csv")

Others methods have been tested, like converting the object in RAM, writing in a connection in RAM, but for some reason, writing on the disk is faster.

Once the file is written, it needs to be uploaded.

# Upload the file with a PUT request
responseWrite <- PUT(uriWrite, body = upload_file("tmp.csv"))


List files in a directory

library(tidyverse)
library(httr)

filelist <-
  # LISTSTATUS is the webHDFS equivalent of `$ ls`
  paste0(hdfsUri, dirUri, "?op=LISTSTATUS") %>% 
  GET() %>% 
  content(type = "application/json") %>% 
  pluck(1, 1) %>% 
  map_dfr(as_tibble)

glimpse(filelist)
# Observations: 16
# Variables: 13
# $ accessTime       <dbl> 1.527669e+12, 1.527588e+12, 1.527588e+12, 1.527588...
# $ blockSize        <int> 134217728, 134217728, 134217728, 134217728, 134217...
# $ childrenNum      <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
# $ fileId           <int> 878668, 878659, 878669, 878672, 878670, 878663, 87...
# $ group            <chr> "foo.bar", "foo.bar", "foo.bar", "f...
# $ length           <int> 6590121, 37314676, 2174591, 7325908, 4599335, 1677...
# $ modificationTime <dbl> 1.527582e+12, 1.527582e+12, 1.527582e+12, 1.527582...
# $ owner            <chr> "foo.bar", "foo.bar", "foo.bar", "f...
# $ pathSuffix       <chr> "baz_qux_20180524.TXT", "baz_qux_20180525.TXT", ...
# $ permission       <chr> "755", "755", "755", "755", "755", "755", "755", "...
# $ replication      <int> 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3, 3
# $ storagePolicy    <int> 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
# $ type             <chr> "FILE", "FILE", "FILE", "FILE", "FILE", "FILE", "F...

filelist %>% 
  # "pathSuffix" is the actual file names
  pull("pathSuffix")
#  [1] "baz_qux_20180524.TXT"   "baz_qux_20180525.TXT"  
#  [3] ...