Noa Be
Noa Be

Reputation: 337

Spark Structured Streaming writeStream to output one global csv

I am currently making a raw log data aggregator using Spark Structured Streaming.

The Inputstream is made with a directory of text files :

// == Input == //

val logsDF = spark.readStream
  .format("text")
  .option("maxFilesPerTrigger", 1)
  .load("input/*")

Logs are then parsed ...

// == Parsing == //

val logsDF2 = ...

... and aggregated

// == Aggregation == //

val windowedCounts = logsDF2
  .withWatermark("window_start", "15 minutes")
  .groupBy(
    col("window"),
    col("node")
  ).count()

Everything is working fine when I use the "console" sink : The results are updated batch by bath in the console :

// == Output == //

val query = windowedCounts.writeStream
  .format("console")
  .outputMode("complete")
  .start()
  .awaitTermination()

Now I want to save my results in one unique file (json, parquet, csv ..)

// == Output == //

val query = windowedCounts.writeStream
  .format("csv")
  .option("checkpointLocation", "checkpoint/")
  .start("output/")
  .awaitTermination()

But it outputs me 400 empty csv ... How can I get my results as I did in the console ?

Thank you very much !

Upvotes: 4

Views: 9524

Answers (1)

FVCC
FVCC

Reputation: 317

Long time ago, but ran through this issue myself and thought I would solve it. Indeed, I think your code is good until you try to sink the data into a csv file. Try changing the writeStream csv code to this:

// == Output == //
val query = windowedCounts.writeStream
  .format("csv")
  .trigger(processingTime="10 seconds")
  .option("checkpointLocation", "checkpoint/")
  .option("path", "output_path/")
  .outputMode("append")
  .start()
  .awaitTermination()

The line:

.trigger(processingTime="10 seconds")

should solve your 400 files as it only writes a new file every 10 seconds. Both of these lines:

.option("path", "output_path/")
.outputMode("append")

should solve the empty file problem as you append the latest values and output the file into a specific output directory.

Upvotes: 5

Related Questions