Reputation: 411
I am really getting desperate here.
What I am trying to do is, capturing a stream, aggregated stream data for some seconds, and after save it as a CSV file.
val sparkSession : SparkSession = SparkSession.builder()
.master("local[*]")
.appName("Streaming")
.config(conf)
//.enableHiveSupport()
.getOrCreate()
So, I am capturing the stream
val lines = streamingContext.socketTextStream(HOST, PORT)
val linesMessage = lines.map(_.split(DELIMITER)(1))
and count the incidents
val counts = linesMessage.map(tag => (tag, 1))
.reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y }, Seconds(EVENT_PERIOD_SECONDS*4), Seconds(EVENT_PERIOD_SECONDS))
which is so far working.
Now, I would like so save each windowLength in a CSV file, and stucking there:
val schema = new StructType()
.add(StructField("text", StringType, true))
.add(StructField("count", IntegerType, true))
counts.foreachRDD(rdd =>
{
//rdd.saveAsTextFile(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis()))
val df = sparkSession.createDataFrame(rdd.map(attributes => Row(attributes._1, attributes._2)), schema)
df.write.format("csv").save(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis()))
})
Can anybody assist me with that please?
Sorry, for get the error:
When I just run rdd.saveAsTextFile it creates various plain text file that had to be merged.
By creating the createDataFrame is get this error
17/11/12 23:06:04 ERROR JobScheduler: Error running job streaming job 1510490490000 ms.1
java.lang.NullPointerException
at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128)
at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:578)
at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:335)
at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:152)
at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:146)
Line 146 is the sparkSession.createDataFrame line
Upvotes: 0
Views: 1538
Reputation: 411
I "solved" it by changing my code like this:
linesFilter.window(Seconds(EVENT_PERIOD_SECONDS*WRITE_EVERY_N_SECONDS), Seconds(EVENT_PERIOD_SECONDS*WRITE_EVERY_N_SECONDS)).foreachRDD { (rdd, time) =>
if (rdd.count() > 0) {
rdd
.coalesce(1,true)
.map(_.replace(DELIMITER_STREAM, DELIMITER_OUTPUT))
//.map{_.mkString(DELIMITER_OUTPUT) }
.saveAsTextFile(CHECKPOINT_DIR + "/output/o_" + sdf.format(System.currentTimeMillis()))
}
}
Upvotes: 1
Reputation: 3036
Possible cause and workaround
The problem could come from the sparkSession
which may be uncoupled with the streamingContext
.
Might be worth to try to get the spark session from the streamingContext
to ensure both share the same configuration:
counts.foreachRDD(rdd => {
val spark = sparkSession.builder.config(streamingContext.sparkContext.getConf).getOrCreate()
val df = spark.createDataFrame(rdd.map(attributes => Row(attributes._1, attributes._2)), schema)
...
})
Partioning
As mentioned in the comments, if your objective is to reduce the number of files created by Spark, you can just use repartition
on the RDD
directly provided to your function:
counts.foreachRDD(rdd => {
rdd.repartition(10)
.saveAsTextFile(CHECKPOINT_DIR + "/output_" + sdf
.format(System.currentTimeMillis()))
})
repartition
should be used very carefully though, as you need to have good estimations of the size of the resulting partitions to avoid under or over-sizing them.
Upvotes: 0