chaos.ct
chaos.ct

Reputation: 1091

UDP server with asyncio

I am trying to receive UDP packets in a python asyncio loop. I am very new at asyncio so I'm probably doing something wrong, as the callbacks never get called:

import asyncio

class DiscoveryProtocol(asyncio.DatagramProtocol):
    def __init__(self):
        super().__init__()
    def connection_made(self, transport):
        self.transport = transport
    def datagram_received(self, data, addr):
        print(data)

def start_discovery():
    loop = asyncio.get_event_loop()
    t = loop.create_datagram_endpoint(DiscoveryProtocol,local_addr=('',5006))
    loop.run_until_complete(t)
    loop.run_forever()

I can receive packets with plain old sockets (without asyncio).

What am I doing wrong?

Upvotes: 9

Views: 17996

Answers (1)

Terra Kestrel
Terra Kestrel

Reputation: 20334

No accepted answer so this seems to have atrophied, but it comes up in searches. If someone gets here and wants a final solution, the following code snippet illustrates a fully functional UDP server. The write_messages() function is just a test method. It reads a log file with whatever you want in it, and publishes each line as a Syslog message to UDP port 514. Running this as a script illustrates the server listening and printing whatever it drains from syslog. update the SyslogProtocol with whatever formatting/processing needs you have.

import socket
import asyncio
import os, random

HOST, PORT = 'localhost', 514


def send_test_message(message: 'Message to send to UDP port 514') -> None:
    sock = socket.socket(socket.AF_INET,  # Internet
                         socket.SOCK_DGRAM)  # UDP
    sock.sendto(message.encode(), (HOST, PORT))


async def write_messages() -> "Continuously write messages to UDP port 514":
    dir_path = os.path.dirname(os.path.realpath(__file__))
    fp = open(os.path.join(dir_path, "tests/example.log"))
    print("writing")
    for line in fp.readlines():
        await asyncio.sleep(random.uniform(0.1, 3.0))
        send_test_message(line)


class SyslogProtocol(asyncio.DatagramProtocol):
    def __init__(self):
        super().__init__()

    def connection_made(self, transport) -> "Used by asyncio":
        self.transport = transport

    def datagram_received(self, data, addr) -> "Main entrypoint for processing message":
        # Here is where you would push message to whatever methods/classes you want.
        print(f"Received Syslog message: {data}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    t = loop.create_datagram_endpoint(SyslogProtocol, local_addr=('0.0.0.0', PORT))
    loop.run_until_complete(t) # Server starts listening
    loop.run_until_complete(write_messages()) # Start writing messages (or running tests)
    loop.run_forever()

Upvotes: 10

Related Questions