Reputation: 13585
It seems be very straightforward implementation, but looks like there are some issues.
This job reads offsets (ui event data) from kafka topic, does some aggregation and writes it to Aerospike database.
In case of high traffic I start seeing this issue where the job is running fine but no new data is being inserted. Looking at the logs I see this WARNING messages:
Current batch is falling behind. The trigger interval is 30000 milliseconds, but spent 43491 milliseconds
Few times job resumes writing data but I can see the counts are low which indicates that there is some data loss.
Here is the code:
Dataset<Row> stream = sparkSession.readStream()
.format("kafka")
.option("kafka.bootstrap.servers", kafkaBootstrapServersString)
.option("subscribe", newTopic)
.option("startingOffsets", "latest")
.option("enable.auto.commit", false)
.option("failOnDataLoss", false)
.load();
StreamingQuery query = stream
.writeStream()
.option("startingOffsets", "earliest")
.outputMode(OutputMode.Append())
.foreach(sink)
.trigger(Trigger.ProcessingTime(triggerInterval))
.queryName(queryName)
.start();
Upvotes: 5
Views: 2645
Reputation: 1708
You may need to deal with maxOffsetsPerTrigger
to adjust total input records per batch. Otherwise lag on your application may bring more records in a batch hence it slows down next batch, in turn bring more lags in following batches.
Please refer below link for more details on Kafka configuration.
https://spark.apache.org/docs/2.4.0/structured-streaming-kafka-integration.html
Upvotes: 2