Reputation: 20090
I am looking to process elements on a queue (Kafka or Amazon Kinesis) and to have multiple operations to be performed on each element, for example:
On each of these operations I am expecting an exactly-once semantic, is this achievable in Apache Spark and how?
Upvotes: 2
Views: 733
Reputation: 663
Exactly once is a side effect of at least once processing semantic, when the operations are idempotent. In your case, if all 3 operations are idempotent, then you can get exactly once semantic. The other way to get exactly once semantic is to wrap all the 3 operations and Kafka offset storage in one transaction, which is not feasible.
https://pkghosh.wordpress.com/2016/05/18/exactly-once-stream-processing-semantics-not-exactly/
Upvotes: 0
Reputation: 63249
You will need to manage unique keys manually: but given that approach it is possible when using
KafkaUtils.createDirectStream
From the Spark docs http://spark.apache.org/docs/latest/streaming-kafka-integration.html :
Approach 2: Direct Approach (No Receivers)
each record is received by Spark Streaming effectively exactly once despite failures.
And here is the idempotency
requirement - so e.g. saving unique key per message in Postgres
:
In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).
Here is an idea of the kind of code you would need to manage the unique keys (from http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ ):
stream.foreachRDD { rdd =>
rdd.foreachPartition { iter =>
// make sure connection pool is set up on the executor before writing
SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)
iter.foreach { case (key, msg) =>
DB.autoCommit { implicit session =>
// the unique key for idempotency is just the text of the message itself, for example purposes
sql"insert into idem_data(msg) values (${msg})".update.apply
}
}
}
}
A unique per-message ID would need to be managed.
Upvotes: 1