Reputation: 70
I'm trying to query two tables from cassandra into two dataframes, then join these two dataframes into one dataframe(result).
I can get the correct result and the spark job could finished normally on eclipse in my computer. But when I submitted to Spark server (local mode), the job just hang without any exception or error message and couldn't finish after a hour until I press Ctrl+C to stop it.
I have no idea why the job cannot work on spark server, what is the difference between eclipse and spark server. If the reason is a OutofMemory problem, is it possible spark didn't throw any exception and just hang?
Any advice? Thanks~
Submit command
/usr/bin/spark-submit --class com.test.c2c --jars file:///home/iotcloud/Documents/grace/spark/spark-cassandra-connector-1.6.3-s_2.10.jar file:///home/iotcloud/Documents/grace/spark/C2C_1205.jar
Here is my scala code:
package com.test
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import com.datastax.spark.connector.cql._;
import com.datastax.spark.connector._;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql._;
import org.apache.spark.sql.cassandra._;
object c2c {
def main(args: Array[String]) {
println("Start...")
val conf = new SparkConf(true)
.set("spark.cassandra.connection.host", "10.2.1.67")
.setAppName("ConnectToCassandra")
.setMaster("local")
val sc = new SparkContext(conf)
println("Cassandra setting done...")
println("================================================1")
println("Start to save to cassandra...")
val cc = new CassandraSQLContext(sc)
cc.setKeyspace("iot_test")
val df_info = cc.sql("select gatewaymac,sensormac,sensorid,sensorfrequency,status from tsensor_info where gatewaymac != 'failed'")
val df_loc = cc.sql("select sensorlocationid,sensorlocationname,company,plant,department,building,floor,sensorid from tsensorlocation_info where sensorid != 'NULL'")
println("================================================2")
println("registerTmepTable...")
df_info.registerTempTable("info")
df_loc.registerTempTable("loc")
println("================================================4")
println("mapping table...")
println("===info===")
df_info.printSchema()
df_info.take(5).foreach(println)
println("===location===")
df_loc.printSchema()
df_loc.take(5).foreach(println)
println("================================================5")
println("print mapping result")
val result = df_info.join(df_loc, "sensorid")
result.registerTempTable("ref")
result.printSchema()
result.take(5).foreach(println)
println("====Finish====")
sc.stop()
}
}
Normal result on Eclipse
Cassandra setting done...
================================================1
Start to save to cassandra...
================================================2
registerTmepTable...
================================================4
mapping table...
===info===
root
|-- gatewaymac: string (nullable = true)
|-- sensormac: string (nullable = true)
|-- sensorid: string (nullable = true)
|-- sensorfrequency: string (nullable = true)
|-- status: string (nullable = true)
[0000aaaaaaaaat7f,e9d050f0ebc25000 ,0000aaaaaaaaat7f3242,null,N]
[000000000000b219,c879b4f921c25000 ,000000000000b2193590,00:01,N]
[0000aaaaaaaaaabb,2c153cf9f0c25000 ,0000aaaaaaaaaabba353,null,Y]
[000000000000a412,17da712795c25000 ,000000000000a4126156,00:05,Y]
[000000000000a104,b2a4b8b7a6c25000 ,000000000000a1046340,00:01,N]
===location===
root
|-- sensorlocationid: string (nullable = true)
|-- sensorlocationname: string (nullable = true)
|-- company: string (nullable = true)
|-- plant: string (nullable = true)
|-- department: string (nullable = true)
|-- building: string (nullable = true)
|-- floor: string (nullable = true)
|-- sensorid: string (nullable = true)
[JA092,A1F-G-L00-S066,IAC,IACJ,MT,A,1,000000000000a108a19f]
[JA044,A2F-I-L00-S037,IAC,IACJ,MT,A,2,000000000000a2024246]
[JA111,A2F-C-L00-S076,IAC,IACJ,MPA,A,2,000000000000a210c710]
[PA041,A1F-SMT-S03,IAC,IACP,SMT,A,1,000000000000a10354c1]
[PC010,C3F-IQC-S03,IAC,IACP,IQC,C,3,000000000000c3269786]
================================================5
print mapping result
root
|-- sensorid: string (nullable = true)
|-- gatewaymac: string (nullable = true)
|-- sensormac: string (nullable = true)
|-- sensorfrequency: string (nullable = true)
|-- status: string (nullable = true)
|-- sensorlocationid: string (nullable = true)
|-- sensorlocationname: string (nullable = true)
|-- company: string (nullable = true)
|-- plant: string (nullable = true)
|-- department: string (nullable = true)
|-- building: string (nullable = true)
|-- floor: string (nullable = true)
[000000000000a10275bc,000000000000a102,e85ce9b9d2c25000 ,00:05,Y,PA030,A1F-WM-S02,IAC,IACP,WM,A,1]
[000000000000b117160c,000000000000b117,33915a79e5c25000 ,00:05,Y,PB011,B1F-WM-S01,IAC,IACP,WM,B,1]
[000000000000a309024b,000000000000a309,afdab2efbbc25000 ,00:00,N,PA101,A3F-MP6-R01,IAC,IACP,MP6,A,3]
[000000000000c6294109,000000000000c629,383cca8e45c25000 ,00:05,Y,PC017,C6F-WM-S01,IAC,IACP,WM,C,6]
[000000000000a205e52e,000000000000a205,8d83303cf4c25000 ,00:00,N,PA063,A2F-MP6-R04,IAC,IACP,MP6,A,2]
====Finish====
Upvotes: 1
Views: 1577
Reputation: 70
Finally found the answer is that I forgot to set master to spark standalone cluster. I submitted spark job on local spark server.
After setting master to spark standalone cluster, the job works fine. Maybe it's because the local spark server doesn't have enough cores to execute the task. (It's an old machine with 2 cores. By the way, spark standalone cluster have 4 nodes, and they are all old machines.)
Upvotes: 1