Reputation: 165
I'm reading data from a Kafka topic and I'm putting it to Azure ADLS (HDFS Like) in partitioned mode.
My code is like below :
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.option("failOnDataLoss", false)
.load()
.selectExpr(/*"CAST(key AS STRING)",*/ "CAST(value AS STRING)").as(Encoders.STRING)
df.writeStream
.partitionBy("year", "month", "day", "hour", "minute")
.format("parquet")
.option("path", outputDirectory)
.option("checkpointLocation", checkpointDirectory)
.outputMode("append")
.start()
.awaitTermination()
I have about 2000 records/sec, and my problem is that Spark is inserting the data every 45sec, and I want the data to be inserted immediately.
Anyone know how to control the size of micro batch ?
Upvotes: 1
Views: 511
Reputation: 4133
From the Spark 2.3 version it is available the Continuous processing mode. In the official doc. you can read that only three sinks are supported for this mode and only the Kafka sink is ready for production, and "the end-to-end low-latency processing can be best observed with Kafka as the source and sink"
df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("checkpointLocation", "/tmp/0")
.option("topic", "output0")
.trigger(Trigger.Continuous("0 seconds"))
.start()
So, it seems that, at the moment, you can´t use HDFS as sink using the Continuous mode. In your case maybe you can test Akka Streams and the Alpakka connector
Upvotes: 2