Jessie
Jessie

Reputation: 493

How to configure Apache Kafka to sending data at specified time?

Lets consider the following abstaract schema of working Apache Kafka:

Poducers ->(Send messages) -> Apache Kafka -> (Resend to customers) -> Customers 

It is possible to configure Kafka to send messages to customers at the specified time?

The second question is it real to roll back message to Kafka from customer?

Upvotes: 3

Views: 4269

Answers (4)

Antonios Chalkiopoulos
Antonios Chalkiopoulos

Reputation: 868

If i get the question, you want to send data to a customer at a particular point in time. If using Lenses for Apache Kafka it could be as simple as

#cron the following to execute daily at 24:00 curl -XGET http://lenses-host:port/api/sql/data?sql=SELECT * from topicA WHERE customer = 'customerA WHERE _ts > 'yyyy-mm-dd hh:mm:ss'' > customerA.json send [email protected] customerA.json

So to answer first part of the question, you need to build your consumer logic. Rollback is not supported in Kafka, although you could easily do something like:

INSERT INTO topicB SELECT * from topicA WHERE _ts < '2017-12-10 00:00:00'

So you can easily create a new topic from another one, but there are no rollback semantics.

Upvotes: 2

ppatierno
ppatierno

Reputation: 10065

As others have already replied, Kafka doesn't push messages to consumers but consumers pull messages from Kafka; it means that you need to write your consumer in order to pull messages from Kafka topic at specific times (or intervals). About rollback what do you mean ? Maybe that consumer gets messages from Kafka but then it wants to re-read same messages later because errors happen during the first processing ? If yes, there are two aspects to consider about Kafka :

  • Kafka has a retention of the messages that can be configured (even for days), it means that when the consumer gets messages they are not deleted from the topic partition
  • instead when the consumer gets messages, it has to commit the offset so that it can trace what's the latest message read from the topic partition. This commit can be done automatically or manually so that you can commit the offset only if your process went well. In any case, you are able to re-wind the stream and decide to re-start reading the topic partition from a specific offset.

Upvotes: 1

shakeel
shakeel

Reputation: 901

Extending to Robin,

Kafka won't push messages to consumer, consumer need to pull messages from Kafka.

Look at below python snippet to read messages from Kafka:

running = True
while running:
    msg = c.poll(timeout=1.0)
    if not msg.error():
        print('Received message: %s' % msg.value().decode('utf-8'))
    elif msg.error().code() != KafkaError._PARTITION_EOF:
        print(msg.error())
        running = False

In above snippet msg = c.poll(timeout=1.0) is used to pull message from Kafka every second. if you want to you increase timeout to any number of seconds. these means Kafka consumer consumer will pull message from every time interval.

if you want to do schedule, you have to call poll method at schedule timings.

Note : you session.timeout.ms should be greater than poll time

Upvotes: 0

Robin Moffatt
Robin Moffatt

Reputation: 32050

Consumers pull messages from Kafka; Kafka does not push ("send") them. So it's up to your consumers to pull the data when they want it.

Upvotes: 1

Related Questions