Reputation: 50
currently I'm working with Kafka / Zookeeper and pySpark (1.6.0).
I have successfully created a kafka consumer, which is using the KafkaUtils.createDirectStream()
There is no problem with all the streaming, but I recognized, that my Kafka Topics are not updated to the current offset, after I have consumed some messages.
Since we need the topics updated to have a monitoring here in place this is somehow weird.
In the documentation of Spark I found this comment:
offsetRanges = []
def storeOffsetRanges(rdd):
global offsetRanges
offsetRanges = rdd.offsetRanges()
return rdd
def printOffsetRanges(rdd):
for o in offsetRanges:
print "%s %s %s %s" % (o.topic, o.partition, o.fromOffset, o.untilOffset)
You can use this to update Zookeeper yourself if you want Zookeeper-based Kafka monitoring tools to show progress of the streaming application.
Here is the documentation:
I found a solution in Scala, but I can't find an equivalent for python. Here is the Scala example:
But the question is, how I'm able to update the zookeeper from that point on?
Upvotes: 2
Views: 2504
Reputation: 31692
I write some functions to save and read Kafka offsets with python kazoo library.
First function to get singleton of Kazoo Client:
def get_zookeeper_instance():
from kazoo.client import KazooClient
if 'KazooSingletonInstance' not in globals():
globals()['KazooSingletonInstance'] = KazooClient(ZOOKEEPER_SERVERS)
return globals()['KazooSingletonInstance']
Then functions to read and write offsets:
def read_offsets(zk, topics):
from pyspark.streaming.kafka import TopicAndPartition
from_offsets = {}
for topic in topics:
for partition in zk.get_children(f'/consumers/{topic}'):
topic_partion = TopicAndPartition(topic, int(partition))
offset = int(zk.get(f'/consumers/{topic}/{partition}')[0])
from_offsets[topic_partion] = offset
return from_offsets
def save_offsets(rdd):
zk = get_zookeeper_instance()
for offset in rdd.offsetRanges():
path = f"/consumers/{offset.topic}/{offset.partition}"
zk.set(path, str(offset.untilOffset).encode())
Then before starting streaming you could read offsets from zookeeper and pass them to createDirectStream
for fromOffsets
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
def main(brokers="", topics=['test1', 'test2']):
sc = SparkContext(appName="PythonStreamingSaveOffsets")
ssc = StreamingContext(sc, 2)
zk = get_zookeeper_instance()
from_offsets = read_offsets(zk, topics)
directKafkaStream = KafkaUtils.createDirectStream(
ssc, topics, {"": brokers},
if __name__ == "__main__":
Upvotes: 5
Reputation: 177
I encountered similar question. You are right, by using directStream, means using kafka low-level API directly, which didn't update reader offset. there are couple of examples for scala/java around, but not for python. but it's easy to do it by yourself, what you need to do are:
for example, I save the offset for each partition in redis by doing:
stream.foreachRDD(lambda rdd: save_offset(rdd))
def save_offset(rdd):
ranges = rdd.offsetRanges()
for rng in ranges:
rng.untilOffset # save offset somewhere
then at the begin, you can use:
fromoffset = {}
topic_partition = TopicAndPartition(topic, partition)
fromoffset[topic_partition]= int(value) #the value of int read from where you store previously.
for some tools that use zk to track offset, it's better to save the offset in zookeeper. this page: describe how to set the offset, basically, the zk node is: /consumers/[consumer_name]/offsets/[topic name]/[partition id] as we are using directStream, so you have to make up a consumer name.
Upvotes: 1