Reputation: 472
Hi I'm working on SparkR in distribuited mode with yarn cluster.
I have two question :
1) If I made a script that contains R line code and SparkR line code, it will distribute just the SparkR code or simple R too?
This is the script. I read a csv and take just the 100k first record.
I clean it (with R function) deleting NA values and created a SparkR dataframe.
This is what it do: foreach Lineset take each TimeInterval where that LineSet appear and sum some attribute (numeric attribute), after that put them all into a Matrix.
This is the script with R and SparkR code. It takes 7h in standalone mode and like 60h in distributed mode (killed by java.net.SocketException
: Broken Pipe)
LineSmsInt<-fread("/home/sentiment/Scrivania/LineSmsInt.csv")
Short<-LineSmsInt[1:100000,]
Short[is.na(Short)] <- 0
Short$TimeInterval<-Short$TimeInterval/1000
ShortDF<-createDataFrame(sqlContext,Short)
UniqueLineSet<-unique(Short$LINESET)
UniqueTime<-unique(Short$TimeInterval)
UniqueTime<-as.numeric(UniqueTime)
Row<-length(UniqueLineSet)*length(UniqueTime)
IntTemp<-matrix(nrow =Row,ncol=7)
k<-1
colnames(IntTemp)<-c("LINESET","TimeInterval","SmsIN","SmsOut","CallIn","CallOut","Internet")
Sys.time()
for(i in 1:length(UniqueLineSet)){
SubSetID<-filter(ShortDF,ShortDF$LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-filter(SubSetID,SubSetID$TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
k3<-collect(select(SubTime,sum(SubTime$SmsIn)))
IntTemp[k,3]<-k3[1,1]
k4<-collect(select(SubTime,sum(SubTime$SmsOut)))
IntTemp[k,4]<-k4[1,1]
k5<-collect(select(SubTime,sum(SubTime$CallIn)))
IntTemp[k,5]<-k5[1,1]
k6<-collect(select(SubTime,sum(SubTime$CallOut)))
IntTemp[k,6]<-k6[1,1]
k7<-collect(select(SubTime,sum(SubTime$Internet)))
IntTemp[k,7]<-k7[1,1]
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
This is the script R the only things that change is the subset function and of course is a normal R data.frame not SparkR dataframe.
It takes 1.30 minute in standalone mode.
Why it's so fast just in R and it's so slowly in SparkR?
for(i in 1:length(UniqueLineSet)){
SubSetID<-subset.data.frame(LineSmsInt,LINESET==UniqueLineSet[i])
for(j in 1:length(UniqueTime)){
SubTime<-subset.data.frame(SubSetID,TimeInterval==UniqueTime[j])
IntTemp[k,1]<-UniqueLineSet[i]
IntTemp[k,2]<-as.numeric(UniqueTime[j])
IntTemp[k,3]<-sum(SubTime$SmsIn,na.rm = TRUE)
IntTemp[k,4]<-sum(SubTime$SmsOut,na.rm = TRUE)
IntTemp[k,5]<-sum(SubTime$CallIn,na.rm = TRUE)
IntTemp[k,6]<-sum(SubTime$CallOut,na.rm = TRUE)
IntTemp[k,7]<-sum(SubTime$Internet,na.rm=TRUE)
k<-k+1
}
print(UniqueLineSet[i])
print(i)
}
2) First script, in distribuited mode, was killed by :
java.net.SocketException: Broken Pipe
and this appear too sometimes:
java.net.SocketTimeoutException: Accept timed out
It may comes from a bad configuration? suggestion?
Thanks.
Upvotes: 2
Views: 271
Reputation: 330063
Don't take this the wrong way but it is not a particularly well written piece of code. It is already inefficient using core R and adding SparkR to the equation makes it even worse.
If I made a script that contains R line code and SparkR line code, it will distribute just the SparkR code or simple R too?
Unless you're using distributed data structures and functions which operate on these structures it is just a plain R code executed in a single thread on the master.
Why it's so fast just in R and it's so slowly in SparkR?
For starters you execute a single job for each combination of LINESET
, UniqueTime
and column. Each time Spark has scan all records and fetch data to the driver.
Moreover using Spark to handle data that can be easily handled in memory of a single machine simply doesn't make sense. Cost of running the job in case like this is usually much higher than a cost of actual processing.
suggestion?
If you really want to use SparkR just groupBy
and agg
:
group_by(Short, Short$LINESET, Short$TimeInterval) %>% agg(
sum(Short$SmsIn), sum(Short$SmsOut), sum(Short$CallIn),
sum(Short$CallOut), sum(Short$Internet))
If you care about missing (LINESET
, TimeInterval
) pairs fill these using either join
or unionAll
.
In practice it would simply stick with a data.table
and aggregate locally:
Short[, lapply(.SD, sum, na.rm=TRUE), by=.(LINESET, TimeInterval)]
Upvotes: 2