kennyut
kennyut

Reputation: 3831

Using R on a Hadoop installed cluster

I am a R user. I know very little about Linux command, PuTTY or Hadoop/Hive. So please correct me, if I am wrong.

I am now working with a team. They have a running Ubuntu system on a cluster. I can use PuTTY to access this Ubuntu system and access the data files by using code:

 user$hadoop fs -ls /datafolder/ 

or by using hive:

 user$hive
 hive>use datafolder;
 hive>show tables;

On the opposite, the team that I am working with knows very little about R, so they want me to do the R part. I have installed R on the cluster, and also installed rJava HRive and other packages in R. (I am not sure I did this correctly, but R seems running OK).

Now I can do some testing. I can run the following code on R @ cluster:

user$R
>install.packages(c('Rcpp', 'RJSONIO', 'bitops', 'digest', 'functional', 'stringr', 'plyr', 'reshape2','caTools'))

>Sys.setenv(HADOOP_CMD="/opt/cloudera/bin/hadoop")
>Sys.setenv(HADOOP_HOME="/opt/cloudera/lib/hadoop")
>Sys.setenv(HADOOP_STREAMING="/opt/cloudera/lib/hadoop-mapreduce/hadoop-streaming.jar")

>library(rmr2)
>library(rhdfs)
>hdfs.init()

Testing:

>ints = to.dfs(1:10)
>calc = mapreduce(input = ints, map = function(k,v) cbind(v, v/2, 2*v))
>test <- from.dfs(calc)
>test

I can successfully load "test" back by using from.dfs. It seemed like I can save a dummy dataset to Hadoop, and can get it back from Hadoop successfully (correct?)

Now, my question is, how to let R import those datasets that I can see from

user$hadoop fs -ls /datafolder/

or

>hive use datafolder;

Upvotes: 1

Views: 768

Answers (1)

yottalab
yottalab

Reputation: 76

this is example of wordcount with load result back to R:

Sys.setenv(HADOOP_CMD="/usr/bin/hadoop")
    Sys.setenv(HADOOP_STREAMING="/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop-0.20-mapreduce/contrib/streaming/hadoop-streaming.jar")
    Sys.setenv(JAVA_HOME="/usr/java/jdk1.7.0_55-cloudera")
    Sys.setenv(HADOOP_COMMON_LIB_NATIVE_DIR="/opt/cloudera/parcels/CDH-5.1.0-1.cdh5.1.0.p0.53/lib/hadoop/lib/native")
    Sys.setenv(HADOOP_OPTS="-Djava.library.path=HADOOP_HOME/lib")
    library(rhdfs)
    hdfs.init()
    library(rmr2)

    ## space and word delimiter
    map <- function(k,lines) {
      words.list <- strsplit(lines, '\\s')
      words <- unlist(words.list)
      return( keyval(words, 1) )
    }
    reduce <- function(word, counts) {
      keyval(word, sum(counts))
    }
    wordcount <- function (input, output=NULL) {
      mapreduce(input=input, output=output, input.format="text", map=map, reduce=reduce)
    }

    ## variables
    hdfs.root <- '/user/node'
    hdfs.data <- file.path(hdfs.root, 'data')
    hdfs.out <- file.path(hdfs.root, 'out')

    ## run mapreduce job
    ##out <- wordcount(hdfs.data, hdfs.out)
    system.time(out <- wordcount(hdfs.data, hdfs.out))

    ## fetch results from HDFS
    results <- from.dfs(out)
    results.df <- as.data.frame(results, stringsAsFactors=F)
    colnames(results.df) <- c('word', 'count')

    ##head(results.df)
    ## sorted output TOP10
    head(results.df[order(-results.df$count),],10)

Upvotes: 1

Related Questions