Girish Gupta
Girish Gupta

Reputation: 1293

Manually commit offset in kafka Direct Stream in python

I am porting a streaming application written in scala to python. I want to manually commit offset for DStream. This is done in scala like below:

stream = KafkaUtils.createDirectStream(soomeConfigs)
stream.foreachRDD { rdd =>
  val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

  // some time later, after outputs have completed
  stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
}

But I am unable to find the similar APIs in python. Can you please guide me regarding the same that how Can I manually commit offsets using python client.

Upvotes: 2

Views: 865

Answers (1)

Girish Gupta
Girish Gupta

Reputation: 1293

I resolved this by going back to pyspark 2.2 library as it has API to get offsetRanges and storing offsets on redis. I had to go back to python 2.7 as there is no "long" support in python 3.6.

import redis
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils, TopicAndPartition, KafkaRDD


def get_offset_ranges(topic):
    ranges = None

    rk = '{topic}:offsets'.format(topic=topic)
    cache = redis.Redis()
    if cache.exists(rk):
        mapping = cache.hgetall(rk)
        ranges = dict()
        for k, v in mapping.items():
            tp = TopicAndPartition(topic, int(k))
            ranges[tp] = long(v)

    return ranges


def update_offset_ranges(offset_ranges):
    cache = redis.Redis()
    for rng in offset_ranges:
        rk = '{rng.topic}:offsets'.format(rng=rng)
        print("updating redis_key: {}, partion:{} , lastOffset: {} ".format(rk, rng.partition, rng.untilOffset))
        cache.hset(rk, rng.partition, rng.untilOffset)


def do_some_work(rdd):
    pass


def process_dstream(rdd):
    rdd.foreachPartition(lambda iter: do_some_work(iter))

    krdd = KafkaRDD(rdd._jrdd, sc, rdd._jrdd_deserializer)
    off_ranges = krdd.offsetRanges()
    for o in off_ranges:
        print(str(o))
    update_offset_ranges(off_ranges)


sc = SparkContext(appName="mytstApp")
ssc = StreamingContext(sc, 1)

kafka_params = {
    "bootstrap.servers": "localhost:9092",
    "group.id": "myUserGroup",
    "enable.auto.commit": "false",
    "auto.offset.reset": "smallest"
}

topic = "mytopic"
offset_ranges = get_offset_ranges(topic)
dstream = KafkaUtils.createDirectStream(ssc, "mytopic", kafka_params, fromOffsets=offset_ranges)
dstream.foreachRDD(process_dstream)
# Start our streaming context and wait for it to 'finish'
ssc.start()

# Wait for the job to finish
try:
    ssc.awaitTermination()
except Exception as e:
    ssc.stop()
    raise e  # to exit with error condition

Upvotes: 2

Related Questions