DanieleO
DanieleO

Reputation: 472

SparkR dubt and Broken pipe exception

Hi I'm working on SparkR in distribuited mode with yarn cluster.

I have two question :

1) If I made a script that contains R line code and SparkR line code, it will distribute just the SparkR code or simple R too?

This is the script. I read a csv and take just the 100k first record. I clean it (with R function) deleting NA values and created a SparkR dataframe.
This is what it do: foreach Lineset take each TimeInterval where that LineSet appear and sum some attribute (numeric attribute), after that put them all into a Matrix.

This is the script with R and SparkR code. It takes 7h in standalone mode and like 60h in distributed mode (killed by java.net.SocketException: Broken Pipe)

LineSmsInt<-fread("/home/sentiment/Scrivania/LineSmsInt.csv")
Short<-LineSmsInt[1:100000,]
Short[is.na(Short)] <- 0
Short$TimeInterval<-Short$TimeInterval/1000
ShortDF<-createDataFrame(sqlContext,Short)
UniqueLineSet<-unique(Short$LINESET)
UniqueTime<-unique(Short$TimeInterval)
UniqueTime<-as.numeric(UniqueTime)
Row<-length(UniqueLineSet)*length(UniqueTime)
IntTemp<-matrix(nrow =Row,ncol=7)
k<-1

colnames(IntTemp)<-c("LINESET","TimeInterval","SmsIN","SmsOut","CallIn","CallOut","Internet")
Sys.time()
for(i in 1:length(UniqueLineSet)){
  SubSetID<-filter(ShortDF,ShortDF$LINESET==UniqueLineSet[i])
  for(j in 1:length(UniqueTime)){
    SubTime<-filter(SubSetID,SubSetID$TimeInterval==UniqueTime[j])       
    IntTemp[k,1]<-UniqueLineSet[i]
    IntTemp[k,2]<-as.numeric(UniqueTime[j])
    k3<-collect(select(SubTime,sum(SubTime$SmsIn)))
    IntTemp[k,3]<-k3[1,1]
    k4<-collect(select(SubTime,sum(SubTime$SmsOut)))
    IntTemp[k,4]<-k4[1,1]
    k5<-collect(select(SubTime,sum(SubTime$CallIn)))
    IntTemp[k,5]<-k5[1,1]
    k6<-collect(select(SubTime,sum(SubTime$CallOut)))
    IntTemp[k,6]<-k6[1,1]
    k7<-collect(select(SubTime,sum(SubTime$Internet)))
    IntTemp[k,7]<-k7[1,1]
    k<-k+1
  }
  print(UniqueLineSet[i])
  print(i)
}

This is the script R the only things that change is the subset function and of course is a normal R data.frame not SparkR dataframe. It takes 1.30 minute in standalone mode.
Why it's so fast just in R and it's so slowly in SparkR?

for(i in 1:length(UniqueLineSet)){
  SubSetID<-subset.data.frame(LineSmsInt,LINESET==UniqueLineSet[i])
  for(j in 1:length(UniqueTime)){
    SubTime<-subset.data.frame(SubSetID,TimeInterval==UniqueTime[j])
    IntTemp[k,1]<-UniqueLineSet[i]
    IntTemp[k,2]<-as.numeric(UniqueTime[j])
    IntTemp[k,3]<-sum(SubTime$SmsIn,na.rm = TRUE)
    IntTemp[k,4]<-sum(SubTime$SmsOut,na.rm = TRUE)
    IntTemp[k,5]<-sum(SubTime$CallIn,na.rm = TRUE)
    IntTemp[k,6]<-sum(SubTime$CallOut,na.rm = TRUE)
    IntTemp[k,7]<-sum(SubTime$Internet,na.rm=TRUE)
    k<-k+1
  }
  print(UniqueLineSet[i])
  print(i)
}

2) First script, in distribuited mode, was killed by :

java.net.SocketException: Broken Pipe

and this appear too sometimes:

java.net.SocketTimeoutException: Accept timed out

It may comes from a bad configuration? suggestion?

Thanks.

Upvotes: 2

Views: 271

Answers (1)

zero323
zero323

Reputation: 330063

Don't take this the wrong way but it is not a particularly well written piece of code. It is already inefficient using core R and adding SparkR to the equation makes it even worse.

If I made a script that contains R line code and SparkR line code, it will distribute just the SparkR code or simple R too?

Unless you're using distributed data structures and functions which operate on these structures it is just a plain R code executed in a single thread on the master.

Why it's so fast just in R and it's so slowly in SparkR?

For starters you execute a single job for each combination of LINESET, UniqueTime and column. Each time Spark has scan all records and fetch data to the driver.

Moreover using Spark to handle data that can be easily handled in memory of a single machine simply doesn't make sense. Cost of running the job in case like this is usually much higher than a cost of actual processing.

suggestion?

If you really want to use SparkR just groupBy and agg:

group_by(Short, Short$LINESET, Short$TimeInterval) %>% agg(
  sum(Short$SmsIn), sum(Short$SmsOut), sum(Short$CallIn),
  sum(Short$CallOut), sum(Short$Internet))

If you care about missing (LINESET, TimeInterval) pairs fill these using either join or unionAll.

In practice it would simply stick with a data.table and aggregate locally:

Short[, lapply(.SD, sum, na.rm=TRUE), by=.(LINESET, TimeInterval)]

Upvotes: 2

Related Questions