Daniyal
Daniyal

Reputation: 341

Avoiding data loss in multithreaded UDP server

I have this script that creates a UDP server which ingests a stream and puts it in an array. After every minute, I take the data ingested, clean it and send it to another server. Both of these operations are running on theads sharing a same variable.

import socket, time, threading, copy

UDP_IP = "255.255.255.255"
UDP_PORT = 4032

store = []

lock = threading.Lock()

def receive_data():
    global queue
    global lock

    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind((UDP_IP, UDP_PORT))

    while True:
        data = s.recv(9999)
        # store data temporarily
        lock.acquire()
        store.append(data)
        lock.release()

def send_data():
    global store
    global lock

    lock.acquire()
    data = copy.deepcopy(store)
    store = []
    lock.release()

    # Clean up, send and put a timer
    threading.Timer(60, send_data).start()

t1 = threading.Thread(target=receive_data, name='Server')
t1.start()

t2 = threading.Thread(target=send_data, name='Sender')
t2.start() 

My question: is this a good enough script in terms of avoiding data loss? I'm concerned if locking the variable might put the UDP server on hold to access it and somehow skip the data sent during that time.

Upvotes: 0

Views: 342

Answers (1)

quamrana
quamrana

Reputation: 39404

Assuming that your code is like this:

import socket, time, threading, copy

UDP_IP = "255.255.255.255"
UDP_PORT = 4032

store = []

lock = threading.Lock()

def receive_data():
    global store
    global lock
    s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    s.bind((UDP_IP, UDP_PORT))

    while True:
        data = s.recv(9999)
        # store data temporarily
        lock.acquire()  # Note the lock around access to global store
        store.append(data)
        lock.release()

def send_data():
    global store
    global lock

    lock.acquire()
    data = store[:]  # cheap copy of the contents while locked
    store = []
    lock.release()

    # Expensive processing of data to send it to another server
    process(data)

    # Clean up, send and put a timer
    threading.Timer(60, send_data).start()

t1 = threading.Thread(target=receive_data, name='Server')
t1.start()

t2 = threading.Thread(target=send_data, name='Sender')
t2.start() 

then there are no hold-ups as far as reading data is concerned. The socket will be buffering data for you anyway.

Upvotes: 1

Related Questions