HARINI NATHAN
HARINI NATHAN

Reputation: 197

Call_back is not working in nats subscribe in python nats.aio.client

I'm trying simple publish subscribe with nats.aio.client module but callback is not working below is my code from nats.aio.client import Client as NATS

class NAT: def init(self): self.nc = NATS()

async def run(self):
    # nc = NATS()
    print("connection starts")
    await self.nc.connect("demo.nats.io:4222", connect_timeout=10, verbose=True)
    print("connection success")

async def publish_msg(self):
    # nc = NATS()
    print("msg to publish")
    await self.nc.publish("Hello", b'Hellowelcome')

async def subscribe_msg(self):
    async def message_handler(msg):
        print("Hello")
        subject = msg.subject
        reply = msg.reply
        print("Received a message on '{subject} {reply}'".format(
            subject=subject, reply=reply))

    await self.nc.subscribe("Hello", cb=message_handler) 

main file

import asyncio
from nats_client import NAT

nat = NAT()
nats_connection = asyncio.get_event_loop()
nats_connection.run_until_complete(nat.run())
nats_connection.run_until_complete(nat.subscribe_msg())
nats_connection.run_until_complete(nat.publish_msg())
#nats_connection.close()

Let me know if I'm missing anything any help would be appreciated

Upvotes: 1

Views: 1170

Answers (1)

wallyqs
wallyqs

Reputation: 7826

I think your program might be exiting too early so can neither publish nor receive the message, here is a full example on how to start a service in NATS:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):
    nc = NATS()

    async def disconnected_cb():
        print("Got disconnected...")

    async def reconnected_cb():
        print("Got reconnected...")

    await nc.connect("127.0.0.1",
                     reconnected_cb=reconnected_cb,
                     disconnected_cb=disconnected_cb,
                     max_reconnect_attempts=-1,
                     loop=loop)

    async def help_request(msg):
        subject = msg.subject
        reply = msg.reply
        data = msg.data.decode()
        print("Received a message on '{subject} {reply}': {data}".format(
            subject=subject, reply=reply, data=data))
        await nc.publish(reply, b'I can help')

    # Use queue named 'workers' for distributing requests
    # among subscribers.
    await nc.subscribe("help", "workers", help_request)

    print("Listening for requests on 'help' subject...")
    for i in range(1, 1000000):
        await asyncio.sleep(1)
        try:
            response = await nc.request("help", b'hi')
            print(response)
        except Exception as e:
            print("Error:", e)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()
    loop.close()

Upvotes: 2

Related Questions