
Reputation: 50

pySpark Kafka Direct Streaming update Zookeeper / Kafka Offset

currently I'm working with Kafka / Zookeeper and pySpark (1.6.0). I have successfully created a kafka consumer, which is using the KafkaUtils.createDirectStream().

There is no problem with all the streaming, but I recognized, that my Kafka Topics are not updated to the current offset, after I have consumed some messages.

Since we need the topics updated to have a monitoring here in place this is somehow weird.

In the documentation of Spark I found this comment:

   offsetRanges = []

     def storeOffsetRanges(rdd):
         global offsetRanges
         offsetRanges = rdd.offsetRanges()
         return rdd

     def printOffsetRanges(rdd):
         for o in offsetRanges:
             print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)


You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.

Here is the documentation:

I found a solution in Scala, but I can't find an equivalent for python. Here is the Scala example:


But the question is, how I'm able to update the zookeeper from that point on?

Upvotes: 2

Views: 2504

Answers (2)

Anton Protopopov
Anton Protopopov

Reputation: 31692

I write some functions to save and read Kafka offsets with python kazoo library.

First function to get singleton of Kazoo Client:


def get_zookeeper_instance():
    from kazoo.client import KazooClient

    if 'KazooSingletonInstance' not in globals():
        globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
    return globals()['KazooSingletonInstance']

Then functions to read and write offsets:

def read_offsets(zk, topics):
    from pyspark.streaming.kafka import TopicAndPartition

    from_offsets = {}
    for topic in topics:
        for partition in zk.get_children(f'/consumers/{topic}'):
            topic_partion = TopicAndPartition(topic, int(partition))
            offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
            from_offsets[topic_partion] = offset
    return from_offsets

def save_offsets(rdd):
    zk = get_zookeeper_instance()
    for offset in rdd.offsetRanges():
        path = f"/consumers/{offset.topic}/{offset.partition}"
        zk.set(path, str(offset.untilOffset).encode())

Then before starting streaming you could read offsets from zookeeper and pass them to createDirectStream for fromOffsets argument.:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

def main(brokers="", topics=['test1', 'test2']):
    sc = SparkContext(appName="PythonStreamingSaveOffsets")
    ssc = StreamingContext(sc, 2)

    zk = get_zookeeper_instance()
    from_offsets = read_offsets(zk, topics)

    directKafkaStream = KafkaUtils.createDirectStream(
        ssc, topics, {"": brokers},


if __name__ == "__main__":

Upvotes: 5

Gavin Huang
Gavin Huang

Reputation: 177

I encountered similar question. You are right, by using directStream, means using kafka low-level API directly, which didn't update reader offset. there are couple of examples for scala/java around, but not for python. but it's easy to do it by yourself, what you need to do are:

  • read from the offset at the beginning
  • save the offset at the end

for example, I save the offset for each partition in redis by doing:

stream.foreachRDD(lambda rdd: save_offset(rdd))
def save_offset(rdd):
  ranges = rdd.offsetRanges()
  for rng in ranges:
     rng.untilOffset # save offset somewhere

then at the begin, you can use:

fromoffset = {}
topic_partition = TopicAndPartition(topic, partition)
fromoffset[topic_partition]= int(value) #the value of int read from where you store previously.

for some tools that use zk to track offset, it's better to save the offset in zookeeper. this page: describe how to set the offset, basically, the zk node is: /consumers/[consumer_name]/offsets/[topic name]/[partition id] as we are using directStream, so you have to make up a consumer name.

Upvotes: 1

Related Questions