Reputation: 1181
I am facing an issue of data loss in spark streaming with Kafka, my use case is as follow:
- Spark streaming(DirectStream) application reading messages from Kafka topic and processing it.
- On the basis of the processed message, an app will write the processed message to different Kafka topics for e.g. if the message is harmonized then write to the harmonized topic else unharmonized topic.
Now, the problem is that during the streaming somehow I am losing some messaged i.e all the incoming messages are not written to harmonized or unharmonized topics. for e.g., if app received 30 messages in one batch then sometimes it writes all the messages to output topics(this is the expected behaviour) but sometimes it writes only 27 (3 messages are lost, this number can change).
Following is the version I am using:
- Spark 1.6.0
- Kafka 0.9
Kafka topics configuration is as follow:
- num of brokers: 3
- num replication factor: 3
- num of partitions: 3
Following are the properties I am using for kafka:
val props = new Properties()
props.put("metadata.broker.list", properties.getProperty("metadataBrokerList"))
props.put("auto.offset.reset", properties.getProperty("autoOffsetReset"))
props.put("group.id", properties.getProperty("group.id"))
props.put("serializer.class", "kafka.serializer.StringEncoder")
props.put("outTopicHarmonized", properties.getProperty("outletKafkaTopicHarmonized"))
props.put("outTopicUnharmonized", properties.getProperty("outletKafkaTopicUnharmonized"))
props.put("acks", "all");
props.put("retries", "5");
props.put("request.required.acks", "-1")
Following is the piece of code where I am writing processed messages to Kafka:
val schemaRdd2 = finalHarmonizedDF.toJSON
schemaRdd2.foreachPartition { partition =>
val producerConfig = new ProducerConfig(props)
val producer = new Producer[String, String](producerConfig)
partition.foreach { row =>
if (debug) println(row.mkString)
val keyedMessage = new KeyedMessage[String, String](props.getProperty("outTopicHarmonized"),
null, row.toString())
producer.send(keyedMessage)
}
//hack, should be done with the flush
Thread.sleep(1000)
producer.close()
}
I have explicitly added sleep(1000) for testing purpose. But this is also not solving the problem :(
Any suggestion would be appreciated.
Upvotes: 9
Views: 1370
Reputation: 1123
Because you don't want to lose any messages, you might want to choose the 'exactly once' delivery semantics, which provides no data loss. In order to configure the exactly once delivery semantics you have to use acks='all', which you did.
According to this resource[1], acks='all' property must be used in conjunction with min.insync.replicas property.
[1] https://www.linkedin.com/pulse/kafka-producer-delivery-semantics-sylvester-daniel/
Upvotes: 0
Reputation: 6309
Try to tune the batchDuration
parameter (when initializing StreamingContext
) to a number larger than the processing time of each rdd. This solved my problem.
Upvotes: 0