liv2hak
liv2hak

Reputation: 15010

making zeromq client server concurrent using asyncio

I am trying to write a client server in python using zeroMQ using concurrent model (asyncio)

I have the server code server.py below.

import zmq


class Server:

    def __init__(self, port_number):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REP)
        server_url = "tcp://127.0.0.1:" + str(port_number)
        self.socket.bind(server_url)

    async def receive(self):
        while True:
            msg = await self.socket.recv()
            print("Got", msg)
            self.socket.send(msg)

I have the client code below client.py below

import zmq


class Client:
    def __init__(self):
        self.context = zmq.Context()
        self.socket = self.context.socket(zmq.REQ)

    def connect(self):
        self.socket.connect("tcp://127.0.0.1:5000")
        self.socket.connect("tcp://127.0.0.1:6000")

    async def send_msg(self):
        for i in range(10):
            msg = "msg {0}".format(i)
            await self.socket.send(bytes(msg, 'utf-8'))
            print("Sending ", msg)
            await self.socket.recv()            

The code that calls the client and server in asyncio mode is given below.

import asyncio
import time
from server import Server
from client import Client
import logging


async def server(port_num):
    logging.info('Starting server on port 5000')
    s1 = Server(port_num)
    await s1.receive()


async def client():
    logging.info('Starting client')
    c1 = Client()
    c1.send_msg()


async def main_thread():
    await asyncio.gather(server(5000), server(6000), client())

if __name__ == '__main__':
    s = time.perf_counter()
    logging.info("Main    : before creating thread")

    asyncio.run(main_thread())

    logging.info("Main    : before running thread")
    elapsed = time.perf_counter() - s

    print(f'{__file__} finished executing in {elapsed:0.2f} seconds')

When I run the program waits infinitely at await s1.receive() in the function

async def server(port_num):
    logging.info('Starting server on port 5000')
    s1 = Server(port_num)
    await s1.receive()

What am I doing wrong here?

Upvotes: 5

Views: 1171

Answers (1)

rveerd
rveerd

Reputation: 4016

You need to import and use zmq.asyncio to create the ZeroMQ context.

import asyncio
import zmq
import zmq.asyncio

ctx = zmq.asyncio.Context()

Upvotes: 1

Related Questions