Reputation:
I'm trying to get a NATS subscriber to continuously listen to published messages.
My Publisher is an API endpoint which can be hit in a browser. My Subscriber is a python app which should run forever, listening for published messages.
My problem is that the subscriber never prints anything out. If i change the run_forever() to loop.close(), it works but immediately shuts down. I know the publisher is working because I can see the print out from the NATS server.
I am running everything up in docker-compose.
My Subscriber:
import asyncio
from nats.aio.client import Client as NATS
async def run(loop):
await nc.connect("nats://nats:4222", loop=loop)
async def message_handler_A(msg):
print('fsfdsfdsfdsfdsf')
async def message_handler_B(msg):
print('fdsfdsfdsfdsf')
async def message_handler_C(msg):
print('sdfdsfdsf')
await nc.subscribe("message_handler_A", cb=message_handler_A)
await nc.subscribe("message_handler_B", cb=message_handler_B)
await nc.subscribe("message_handler_C", cb=message_handler_C)
print('receiving')
if __name__ == '__main__':
print("RUNNING")
nc = NATS()
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.run_forever()
My Publisher:
import connexion
import six
import json
import asyncio
from nats.aio.client import Client as NATS
from swagger_server import util
async def run(loop):
nc = NATS()
# [begin publish_json]
await nc.connect("nats://nats:4222", loop=loop)
for i in range(10):
await nc.publish("message_handler_B", b"")
await nc.publish("message_handler_C", b"")
await nc.publish("message_handler_A", b"")
def healthz_get(): # noqa: E501
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.close()
return 'Processing Request'
My docker-compose:
version: '3'
services:
nats:
image: 'nats:0.8.0'
entrypoint: "/gnatsd -DV"
expose:
- "4222"
ports:
- "4222:4222"
hostname: nats-server
data_api:
restart: always
build: ..\data_api
image: data_api
container_name: data_api
ports:
- "5022:5022"
depends_on:
- "POCpostgres"
- "queue_app"
queue_app:
build: ..\queue_app
image: queue_app
container_name: queue_app
ports:
- "5023:5023"
Upvotes: 3
Views: 3327
Reputation:
The answer was to use Nats streaming service: STAN:
Subscriber:
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc = NATS()
sc = STAN()
# Start session with NATS Streaming cluster using
# the established NATS connection.
await nc.connect(io_loop=loop)
await sc.connect("test-cluster", "client-123", nats=nc)
# Example async subscriber
async def cb(msg):
print("Received a message (seq={}): {}".format(msg.seq, msg.data))
# Subscribe to get all messages from the beginning.
await sc.subscribe("greetings", start_at='first', cb=cb)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(run(loop))
loop.run_forever()
Publisher: import connexion import six from swagger_server import util
import asyncio
from nats.aio.client import Client as NATS
from stan.aio.client import Client as STAN
async def run(loop):
nc = NATS()
sc = STAN()
# First connect to NATS, then start session with NATS Streaming.
await nc.connect(io_loop=loop)
await sc.connect("test-cluster", "client-456", nats=nc)
await sc.publish("greetings", b'Hello World!')
await nc.flush(1)
print("sent")
await sc.close()
await nc.close()
def healthz_get(): # noqa: E501
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(run(loop))
loop.close()
return 'Processing Request'
Upvotes: 6