user4292309
user4292309

Reputation:

Nats Subscriber continuously listening to publisher

I'm trying to get a NATS subscriber to continuously listen to published messages.

My Publisher is an API endpoint which can be hit in a browser. My Subscriber is a python app which should run forever, listening for published messages.

My problem is that the subscriber never prints anything out. If i change the run_forever() to loop.close(), it works but immediately shuts down. I know the publisher is working because I can see the print out from the NATS server.

I am running everything up in docker-compose.

My Subscriber:

import asyncio
from nats.aio.client import Client as NATS

async def run(loop):

await nc.connect("nats://nats:4222", loop=loop)

async def message_handler_A(msg):
    print('fsfdsfdsfdsfdsf')

async def message_handler_B(msg):
    print('fdsfdsfdsfdsf')

async def message_handler_C(msg):
    print('sdfdsfdsf')

await nc.subscribe("message_handler_A", cb=message_handler_A)
await nc.subscribe("message_handler_B", cb=message_handler_B)
await nc.subscribe("message_handler_C", cb=message_handler_C)
print('receiving')


if __name__ == '__main__':
    print("RUNNING")

    nc = NATS()
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) 


    loop.run_until_complete(run(loop))
    loop.run_forever()

My Publisher:

import connexion
import six
import json
import asyncio
from nats.aio.client import Client as NATS

from swagger_server import util

async def run(loop):
    nc = NATS()
   # [begin publish_json]
    await nc.connect("nats://nats:4222", loop=loop)
    for i in range(10):
      await nc.publish("message_handler_B", b"")
      await nc.publish("message_handler_C", b"")
      await nc.publish("message_handler_A", b"")

def healthz_get():  # noqa: E501

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) 


    loop.run_until_complete(run(loop))
    loop.close()

    return 'Processing Request'

My docker-compose:

version: '3'

services:
  nats:
    image: 'nats:0.8.0'
    entrypoint: "/gnatsd -DV"
    expose:
      - "4222"
    ports:
      - "4222:4222"
    hostname: nats-server


data_api:
    restart: always
    build: ..\data_api
    image: data_api
    container_name: data_api
    ports:
      - "5022:5022"
  depends_on:
    - "POCpostgres"
    - "queue_app"

queue_app:
    build: ..\queue_app
    image: queue_app
    container_name: queue_app
    ports:
      - "5023:5023"

Upvotes: 3

Views: 3327

Answers (1)

user4292309
user4292309

Reputation:

The answer was to use Nats streaming service: STAN:

Subscriber:

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # Start session with NATS Streaming cluster using
    # the established NATS connection.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-123", nats=nc)

    # Example async subscriber
    async def cb(msg):
        print("Received a message (seq={}): {}".format(msg.seq, msg.data))

    # Subscribe to get all messages from the beginning.
    await sc.subscribe("greetings", start_at='first', cb=cb)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run(loop))
    loop.run_forever()

Publisher: import connexion import six from swagger_server import util

import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN

async def run(loop):
    nc = NATS()
    sc = STAN()

    # First connect to NATS, then start session with NATS Streaming.
    await nc.connect(io_loop=loop)
    await sc.connect("test-cluster", "client-456", nats=nc)

    await sc.publish("greetings", b'Hello World!')
    await nc.flush(1)
    print("sent")

    await sc.close()
    await nc.close()

def healthz_get():  # noqa: E501

    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop) 

    loop.run_until_complete(run(loop))
    loop.close()


    return 'Processing Request'

Upvotes: 6

Related Questions