Reputation: 1921
I am trying to read from a Kafka broker with spark streaming but I am facing some issues.
def spark_streaming_from_STABLE_kafka_topic():
conf = SparkConf().setMaster("spark://antonis-dell:7077").setAppName("Kafka_Spark")
sc = SparkContext(conf=conf)
sc.setLogLevel("WARN")
ssc = StreamingContext(sc, 2)
topic = "stable_topic"
kvs = KafkaUtils.createDirectStream(ssc,
[topic],
{"metadata.broker.list": "my-broker",
"auto.offset.reset": "smallest"},
keyDecoder=lambda x: x,
valueDecoder=lambda x: x
)
lines = kvs.window(2, 2).map(lambda x: x[1])
lines.pprint()
return ssc
if __name__ == "__main__":
ssc = StreamingContext.getOrCreate('/home/antonis/Desktop/tmp/checkpoint_v06', lambda: spark_streaming_from_STABLE_kafka_topic())
ssc.start()
ssc.awaitTermination()
Above code does no fetch anything except empty batches:
-------------------------------------------
Time: 2020-05-29 09:32:38
-------------------------------------------
-------------------------------------------
Time: 2020-05-29 09:32:40
-------------------------------------------
Topic stable_topic
contains a fixed size of data. It does not change.
I have another topic that receives data every second. If I use this topic instead of stable_topic
and remove the "auto.offset.reset": "smallest"
then the code fetches data.
I assume that there is something wrong with {"auto.offset.reset": "smallest"}
but I cannot figure it out.
Does anyone now what I am doing wrong?
Upvotes: 1
Views: 349
Reputation: 18475
In later versions, smallest
was replaced by earliest
. Make sure you check the documentation of the version you are using.
Also, the auto.offset.reset
configuration will not get into effect, if the Consumer Group has already been consuming some data from the topic stable_topic
. Therefore, you might consider changing the group.id
in your streaming job.
If you are assigning a new group.id
, make sure to set the auto.offset.reset
to smalles
(or earliest
in newer versions).
Upvotes: 2