Ahmed
Ahmed

Reputation: 11

Apache Pulsar Client - Broker notification of Closed consumer - how to resume data feed?

TLDR: using python client library to subscribe to pulsar topic. logs show: 'broker notification of consumer closed' when something happens server-side. subscription appears to be re-established according to logs but we find later that backlog was growing on cluster b/c no msgs being sent to our subscription to consume

Running into an issue where we have an Apache-Pulsar cluster we are using that is opaque to us, and has a namespace defined where we publish/consume topics, is losing connection with our consumer.

We have a python client consuming from a topic (with one Pulsar Client subscription per thread).

We have run into an issue where, due to an issue on the pulsar cluster, we see the following entry in our client logs:

"Broker notification of Closed consumer"

followed by:

"Created connection for pulsar://houpulsar05.mycompany.com:6650"

....for every thread in our agent.

Then we see the usual periodic log entries like this: {"log":"2022-09-01 04:23:30.269 INFO [139640375858944] ConsumerStatsImpl:63 | Consumer [persistent://tenant/namespace/topicname, subscription-name, 0] , ConsumerStatsImpl (numBytesRecieved_ = 0, totalNumBytesRecieved_ = 6545742, receivedMsgMap_ = {}, ackedMsgMap_ = {}, totalReceivedMsgMap_ = {[Key: Ok, Value: 3294], }, totalAckedMsgMap_ = {[Key: {Result: Ok, ackType: 0}, Value: 3294], })\n","stream":"stdout","time":"2022-09-01T04:23:30.270009746Z"}

This gives the appearance that some connection has been re-established to some other broker.

However, we do not get any messages being consumed. We have an alert on Grafana dashboard which shows us the backlog on topics and subscription backlog. Eventually it either hits a count or rate thresshold which will alert us that there is a problem. When we restart our agent, the subscription is re-establish and the backlog is can immediately be seen heading to 0.

Has anyone experienced such an issue?

Our code is typical:

consumer = client.subscribe(
                  topic='my-topic',
                  subscription_name='my-subscription',
                  consumer_type=my_consumer_type,
                  consumer_name=my_agent_name
)

while True:
    msg = consumer.receive()
    ex = msg.value()

i haven't yet found a readily-available way docker-compose or anything to run a multi-cluster pulsar installation locally on Docker desktop for me to try killing off a broker and see how consumer reacts.

Upvotes: 1

Views: 889

Answers (1)

Yunze Xu
Yunze Xu

Reputation: 11

Currently Python client only supports configuring one broker's address and doesn't support retry for lookup yet. Here are two related PRs to support it:

Therefore, setting up a multi-nodes cluster might be nothing different from a standalone.

If you only specified one broker in the service URL, you can simply test it with a standalone. Run a consumer and a producer sending messages periodically, then restart the standalone. The "Broker notification of Closed consumer" appears when the broker actively closes the connection, e.g. your consumer has sent a SEEK command (by seek call), then broker will disconnect the consumer and the log appears.

BTW, it's better to show your Python client version. And GitHub issues might be a better place to track the issue.

Upvotes: 1

Related Questions