Stephan
Stephan

Reputation: 411

Stream to RDD to DataFrame to CSV

I am really getting desperate here.

What I am trying to do is, capturing a stream, aggregated stream data for some seconds, and after save it as a CSV file.

val sparkSession : SparkSession = SparkSession.builder()
  .master("local[*]")
  .appName("Streaming")
  .config(conf)
  //.enableHiveSupport()
  .getOrCreate()

So, I am capturing the stream

val lines = streamingContext.socketTextStream(HOST, PORT)
val linesMessage = lines.map(_.split(DELIMITER)(1))

and count the incidents

val counts = linesMessage.map(tag => (tag, 1))
        .reduceByKeyAndWindow({ (x, y) => x + y }, { (x, y) => x - y }, Seconds(EVENT_PERIOD_SECONDS*4), Seconds(EVENT_PERIOD_SECONDS))

which is so far working.

Now, I would like so save each windowLength in a CSV file, and stucking there:

  val schema = new StructType()
    .add(StructField("text", StringType, true))
    .add(StructField("count", IntegerType, true))

  counts.foreachRDD(rdd =>
  {
    //rdd.saveAsTextFile(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis()))

    val df = sparkSession.createDataFrame(rdd.map(attributes => Row(attributes._1, attributes._2)), schema)
    df.write.format("csv").save(CHECKPOINT_DIR + "/output_" + sdf.format(System.currentTimeMillis()))
  })

Can anybody assist me with that please?

Sorry, for get the error:

When I just run rdd.saveAsTextFile it creates various plain text file that had to be merged.

By creating the createDataFrame is get this error

17/11/12 23:06:04 ERROR JobScheduler: Error running job streaming job 1510490490000 ms.1

java.lang.NullPointerException
    at org.apache.spark.sql.SparkSession.sessionState$lzycompute(SparkSession.scala:128)
    at org.apache.spark.sql.SparkSession.sessionState(SparkSession.scala:126)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:65)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:578)
    at org.apache.spark.sql.SparkSession.createDataFrame(SparkSession.scala:335)
    at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:152)
    at main.scala.Main$$anonfun$main$scala$Main$$functionToCreateContext$1$1.apply(Main.scala:146)

Line 146 is the sparkSession.createDataFrame line

Upvotes: 0

Views: 1538

Answers (2)

Stephan
Stephan

Reputation: 411

I "solved" it by changing my code like this:

  linesFilter.window(Seconds(EVENT_PERIOD_SECONDS*WRITE_EVERY_N_SECONDS), Seconds(EVENT_PERIOD_SECONDS*WRITE_EVERY_N_SECONDS)).foreachRDD { (rdd, time) =>
    if (rdd.count() > 0) {
      rdd
        .coalesce(1,true)
        .map(_.replace(DELIMITER_STREAM, DELIMITER_OUTPUT))
        //.map{_.mkString(DELIMITER_OUTPUT) }
        .saveAsTextFile(CHECKPOINT_DIR + "/output/o_" + sdf.format(System.currentTimeMillis()))
    }
  }

Upvotes: 1

Alexandre Dupriez
Alexandre Dupriez

Reputation: 3036

Possible cause and workaround

The problem could come from the sparkSession which may be uncoupled with the streamingContext.

Might be worth to try to get the spark session from the streamingContext to ensure both share the same configuration:

counts.foreachRDD(rdd => {
    val spark = sparkSession.builder.config(streamingContext.sparkContext.getConf).getOrCreate()
    val df = spark.createDataFrame(rdd.map(attributes => Row(attributes._1, attributes._2)), schema)
    ...
})

Partioning

As mentioned in the comments, if your objective is to reduce the number of files created by Spark, you can just use repartition on the RDD directly provided to your function:

counts.foreachRDD(rdd => {
    rdd.repartition(10)
       .saveAsTextFile(CHECKPOINT_DIR + "/output_" + sdf
       .format(System.currentTimeMillis()))
})

repartition should be used very carefully though, as you need to have good estimations of the size of the resulting partitions to avoid under or over-sizing them.

Upvotes: 0

Related Questions