Edmondo
Edmondo

Reputation: 20090

Multiple consumers exactly-once processing with Apache Spark Streaming

I am looking to process elements on a queue (Kafka or Amazon Kinesis) and to have multiple operations to be performed on each element, for example:

On each of these operations I am expecting an exactly-once semantic, is this achievable in Apache Spark and how?

Upvotes: 2

Views: 733

Answers (2)

Pranab
Pranab

Reputation: 663

Exactly once is a side effect of at least once processing semantic, when the operations are idempotent. In your case, if all 3 operations are idempotent, then you can get exactly once semantic. The other way to get exactly once semantic is to wrap all the 3 operations and Kafka offset storage in one transaction, which is not feasible.

https://pkghosh.wordpress.com/2016/05/18/exactly-once-stream-processing-semantics-not-exactly/

Upvotes: 0

WestCoastProjects
WestCoastProjects

Reputation: 63249

You will need to manage unique keys manually: but given that approach it is possible when using

KafkaUtils.createDirectStream

From the Spark docs http://spark.apache.org/docs/latest/streaming-kafka-integration.html :

Approach 2: Direct Approach (No Receivers)

each record is received by Spark Streaming effectively exactly once despite failures.

And here is the idempotency requirement - so e.g. saving unique key per message in Postgres:

In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets (see Semantics of output operations in the main programming guide for further information).

Here is an idea of the kind of code you would need to manage the unique keys (from http://blog.cloudera.com/blog/2015/03/exactly-once-spark-streaming-from-apache-kafka/ ):

 stream.foreachRDD { rdd =>
    rdd.foreachPartition { iter =>
    // make sure connection pool is set up on the executor before writing
    SetupJdbc(jdbcDriver, jdbcUrl, jdbcUser, jdbcPassword)

    iter.foreach { case (key, msg) =>
      DB.autoCommit { implicit session =>
        // the unique key for idempotency is just the text of the message itself, for example purposes
        sql"insert into idem_data(msg) values (${msg})".update.apply
      }
    }
  }
}

A unique per-message ID would need to be managed.

Upvotes: 1

Related Questions