Reputation: 8744
I am trying to return an array.
I can print the messages array to the console, and I can see it getting populated. However code after the finally appears to be unreachable. What am I doing wrong?
def kafka_messages(topic, partition):
messages = []
try:
consumer = SimpleConsumer(kafka, b"consumer-group"
, bytes(topic, "UTF-8")
, partitions=[partition])
consumer.provide_partition_info()
consumer.seek(0, 0)
for message in consumer:
messages.append(message) # Messages has values
finally:
if kafka:
kafka.close()
print(messages) # Never even gets run
return messages
Upvotes: 0
Views: 805
Reputation: 8744
Here is what I did:
def kafka_messages(topic, partition):
messages = []
try:
consumer = SimpleConsumer(kafka, b"consumer-group"
, bytes(topic, "UTF-8")
, partitions=[partition])
consumer.provide_partition_info()
consumer.seek(0, 0)
pending = consumer.pending(partitions=[partition]) # Comes with the API being used
count = 1
for message in consumer:
if count == pending:
break # Simply break out when you have iterated through all the items
messages.append(message)
count += 1
finally:
if kafka:
kafka.close()
return messages
Upvotes: 0
Reputation: 328674
There are two possible reasons for this behavior:
consumer
doesn't stop returning elements)Add a print('Loop terminated')
right before the line finally:
to find out if the loop terminates.
If if doesn't, then you need to read the documentation for SimpleConsumer
to find out how to check whether it has more elements so you can terminate the loop.
[EDIT] Looking at the source for SimpleConsumer, it seems that there is a timeout (default is ITER_TIMEOUT_SECONDS
) when there is no message but the code looks odd/broken: If iter_timeout is None
, then the code will sleep and the loop never terminates.
So try to set iter_timeout
to something small when you create the instance and the loop should stop.
Upvotes: 1