Bill
Bill

Reputation: 556

How do I subscribe to a NATS subject in Python and keep receiving messages?

I tried out the example below (from this page):

nc = NATS()

await nc.connect(servers=["nats://demo.nats.io:4222"])

future = asyncio.Future()

async def cb(msg):
  nonlocal future
  future.set_result(msg)

await nc.subscribe("updates", cb=cb)
await nc.publish("updates", b'All is Well')
await nc.flush()

# Wait for message to come in
msg = await asyncio.wait_for(future, 1)

But this only seems useful for receiving one message. How would I subscribe and keep receiving messages?

I've also seen the package example, but it seems to just play both sides of the conversation, then quit.

Upvotes: 2

Views: 10269

Answers (2)

wallyqs
wallyqs

Reputation: 7826

You can find a long running service example as well: https://github.com/nats-io/nats.py/blob/master/examples/service.py

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()

    async def disconnected_cb():
        print("Got disconnected...")

    async def reconnected_cb():
        print("Got reconnected...")

    await nc.connect("127.0.0.1",
                     reconnected_cb=reconnected_cb,
                     disconnected_cb=disconnected_cb,
                     max_reconnect_attempts=-1,
                     loop=loop)

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    await nc.subscribe("help", "workers", help_request)

    print("Listening for requests on 'help' subject...")
    for i in range(1, 1000000):
        await asyncio.sleep(1)
        try:
            response = await nc.request("help", b'hi')
            print(response)
        except Exception as e:
            print("Error:", e)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()
    loop.close()

Upvotes: 6

I. Kozlovic
I. Kozlovic

Reputation: 862

Don't know much about python, but looks like you are just waiting for one message and program ends. You should look at the subscriber example here. As you can see, there is a loop to wait forever or for the SIGTERM signal.

Upvotes: 3

Related Questions