Reputation: 337
I am currently making a raw log data aggregator using Spark Structured Streaming.
The Inputstream is made with a directory of text files :
// == Input == //
val logsDF = spark.readStream
.format("text")
.option("maxFilesPerTrigger", 1)
.load("input/*")
Logs are then parsed ...
// == Parsing == //
val logsDF2 = ...
... and aggregated
// == Aggregation == //
val windowedCounts = logsDF2
.withWatermark("window_start", "15 minutes")
.groupBy(
col("window"),
col("node")
).count()
Everything is working fine when I use the "console" sink : The results are updated batch by bath in the console :
// == Output == //
val query = windowedCounts.writeStream
.format("console")
.outputMode("complete")
.start()
.awaitTermination()
Now I want to save my results in one unique file (json, parquet, csv ..)
// == Output == //
val query = windowedCounts.writeStream
.format("csv")
.option("checkpointLocation", "checkpoint/")
.start("output/")
.awaitTermination()
But it outputs me 400 empty csv ... How can I get my results as I did in the console ?
Thank you very much !
Upvotes: 4
Views: 9524
Reputation: 317
Long time ago, but ran through this issue myself and thought I would solve it. Indeed, I think your code is good until you try to sink the data into a csv file. Try changing the writeStream csv code to this:
// == Output == //
val query = windowedCounts.writeStream
.format("csv")
.trigger(processingTime="10 seconds")
.option("checkpointLocation", "checkpoint/")
.option("path", "output_path/")
.outputMode("append")
.start()
.awaitTermination()
The line:
.trigger(processingTime="10 seconds")
should solve your 400 files as it only writes a new file every 10 seconds. Both of these lines:
.option("path", "output_path/")
.outputMode("append")
should solve the empty file problem as you append the latest values and output the file into a specific output directory.
Upvotes: 5