666craig
666craig

Reputation: 29

Can I avoid a threaded UDP socket in Python dropping data?

First off, I'm new to Python and learning on the job, so be gentle!

I'm trying to write a threaded Python app for Windows that reads data from a UDP socket (thread-1), writes it to file (thread-2), and displays the live data (thread-3) to a widget (gtk.Image using a gtk.gdk.pixbuf). I'm using queues for communicating data between threads.

My problem is that if I start only threads 1 and 3 (so skip the file writing for now), it seems that I lose some data after the first few samples. After this drop it looks fine. Even by letting thread 1 complete before running thread 3, this apparent drop is still there.

Apologies for the length of code snippet (I've removed the thread that writes to file), but I felt removing code would just prompt questions. Hope someone can shed some light :-)

import socket
import threading
import Queue
import numpy
import gtk
gtk.gdk.threads_init()
import gtk.glade
import pygtk


class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'


class displayWithGTK(threading.Thread):

    def __init__(self, displayDataQueue, image, viewArea):
        threading.Thread.__init__(self)
        self.displayDataQueue = displayDataQueue
        self.image = image
        self.viewWidth = viewArea[0]
        self.viewHeight = viewArea[1]
        self.displayData = numpy.zeros((self.viewHeight, self.viewWidth, 3), dtype=numpy.uint16)

    def run(self):
        scan = 0
        try:
            while True:
                if not scan % self.viewWidth: scan = 0
                buffer = self.displayDataQueue.get(timeout=0.1)
                self.displayData[:, scan, 0] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 1] = numpy.fromstring(buffer, dtype=numpy.uint16)
                self.displayData[:, scan, 2] = numpy.fromstring(buffer, dtype=numpy.uint16)
                gtk.gdk.threads_enter()
                self.myPixbuf = gtk.gdk.pixbuf_new_from_data(self.displayData.tostring(), gtk.gdk.COLORSPACE_RGB,
                                                        False, 8, self.viewWidth, self.viewHeight, self.viewWidth * 3)
                self.image.set_from_pixbuf(self.myPixbuf)
                self.image.show()
                gtk.gdk.threads_leave()
                scan += 1
        except Queue.Empty:
            print 'myDisplay finished!'
            pass


def quitGUI(obj):
    print 'Currently active threads: %s' % threading.enumerate()
    gtk.main_quit()


if __name__ == '__main__':

    # Create socket (IPv4 protocol, datagram (UDP)) and bind to address
    socketUDP = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    host = '192.168.1.5'
    port = 1024
    socketUDP.bind((host, port))

    # Data parameters
    samplesPerScan = 256
    packetsPerSecond = 1200
    packetSize = 512
    duration = 1  # For now, set a fixed duration to log data
    numScans = int(packetsPerSecond * duration)

    # Create array to store data
    data = numpy.zeros((samplesPerScan, numScans), dtype=numpy.uint16)

    # Create queue for displaying from
    readDataQueue = Queue.Queue(numScans)

    # Build GUI from Glade XML file
    builder = gtk.Builder()
    builder.add_from_file('GroundVue.glade')
    window = builder.get_object('mainwindow')
    window.connect('destroy', quitGUI)
    view = builder.get_object('viewport')
    image = gtk.Image()
    view.add(image)
    viewArea = (1200, samplesPerScan)

    # Instantiate & start threads
    myServer = readFromUDPSocket(socketUDP, readDataQueue, packetSize, numScans)
    myDisplay = displayWithGTK(readDataQueue, image, viewArea)

    myServer.start()
    myDisplay.start()

    gtk.gdk.threads_enter()
    gtk.main()
    gtk.gdk.threads_leave()
    print 'gtk.main finished!'

Upvotes: 3

Views: 3661

Answers (5)

Glyph
Glyph

Reputation: 31890

UDP is, by definition, unreliable. You must not write programs that expect UDP datagrams to always get through.

Packets are dropped all the time in TCP too, but your program does not need to care, because TCP applications cannot process packets; the TCP stack shows your application a stream of bytes. There is a lot of machinery there to make sure that if you send bytes 'ABCD', you will see 'A' 'B' 'C' 'D' on the end. You may get any possible collection of packets, of course: 'ABC', 'D', or 'AB', CD', etc. Or you may just see 'ABC', and then nothing.

TCP isn't "reliable" because it can magically make your network cables never fail or break; the guarantee that it provides is that up until the point where the stream breaks, you will see everything in order. And after the stream breaks, you'll see nothing.

In UDP there is no such guarantee. If you send four UDP datagrams, 'AB', 'CD', 'EF' 'GH', you may receive all of them, or none of them, or half of them, or just one of them. You may receive them in any order. The only guarantee that UDP tries to provide is that you won't see a message with 'ABCD' in it, because those bytes are in different datagrams.

To sum up: this has nothing to do with Python, or threads, or GTK. It's just a basic fact of life on networks based in physical reality: sometimes the electrical characteristics of your wires are not conducive to getting your messages all the way across them.

You may be able to reduce the complexity of your program by using Twisted, specifically, the listenUDP API, because then you won't be needing to juggle threads or their interaction with GTK: you can just call methods directly on the widget in question from your datagramReceived method. But this won't fix your underlying problem: UDP just drops data sometimes, period. The real solution is to convince your data source to use TCP instead.

Upvotes: 2

666craig
666craig

Reputation: 29

It seems that the problem is with the source. There are two issues:

  1. Looking at Wireshark the source is not consistently transmitting 1200 packets-per-second. Possibly, as Len pointed out, a problem with the outbound stack dropping data. BTW the source is a programmable card with an ethernet port connected to my machine.

  2. The other issue is the after the first 15 packets or so of data there is always a drop. What I discovered is that if I recv 20 packets in the initialisation part of the readFromUDPSocket thread, I can then read the data fine, e.g.

class readFromUDPSocket(threading.Thread):

    def __init__(self, socketUDP, readDataQueue, packetSize, numScans):
        threading.Thread.__init__(self)
        self.socketUDP = socketUDP
        self.readDataQueue = readDataQueue
        self.packetSize = packetSize
        self.numScans = numScans
        for i in range(0, 20):
            buffer = self.socketUDP.recv(self.packetSize)

    def run(self):
        for scan in range(1, self.numScans + 1):
            buffer = self.socketUDP.recv(self.packetSize)
            self.readDataQueue.put(buffer)
        self.socketUDP.close()
        print 'myServer finished!'

Not sure what this points to?! I think all of this rules out not being able to recv and put fast enough though.

Upvotes: -1

MattH
MattH

Reputation: 38245

Edit - Struck out listen/accept sentence, thanks Daniel, I was just coming to remove it when I saw your comment :)

I'd suggest that this is a network programming issue, rather than python per-se.

You've set a packet-per-second rate and a duration to define the number of recv calls you make to your UDP socket. I don't see a listen or accept call to the socket, I'll assume that recv handles that as you say you receive some data. You've not mentioned the generation of the data.

You've defined how many reads you're expecting to make, so I'd assume that the code makes that many receives before exiting, so my conclusion would be that your recv packetSize is insufficient and therefore one read isn't pulling an entire datagram, then the subsequent recv is pulling the next part of the previous datagram.

Can't you look at the data you have received and determine what is missing? What data are you "losing"? How do you know it's lost?

Furthermore, you could use wireshark to verify that your host is actually receiving the data at the same time as verifying the size of the datagrams. Match the capture against the data your recv thread is providing.


Update

You say that you're losing data, but not what it is. I see two possibilities for data-loss:

  • Truncating packets
  • Dropping packets

You've said that the payload size is the same size as that which you are passing to recv, so I'll take it that you're not truncating.

So the factors for dropping packets are a combination of rate of receipt, rate of read-from-receive-buffer and receive-buffer size.

Your calls to Queue.put may be slowing down your rate of read.

So, first determine that you can read 1200 packets per second by modifying readFromUDPSocket to not Queue.put, but count the number of receives and report time taken.

Once you've determined that you can call recv fast enough, the next step is working out what is slowing you down. I suspect it may be your use of Queue, I suggest batching payloads in N-sized groups for placing on the Queue so that you're not trying to call put at 12Hz.

Seeing as you want to sustain a rate of 1200 reads per second I don't think you'll get very far by increasing the receive buffer on the socket.

Upvotes: -1

Len Holgate
Len Holgate

Reputation: 21644

Firstly; can you set the recv buffer size for the socket? If so, set it to something very large as this will let the UDP stack buffer more datagrams for you.

Secondly; if you can use asynchronous I/O then post multiple recv calls at once (again this allows the stack to service more datagrams before it starts to drop them).

Thirdly; you could try unrolling your loop a little and reading multiple datagrams before placing them in your queue; could the locking on the queue be causing the recv thread to run slowly??

Finally; the datagrams may be being dropped elsewhere on the network, there may be nothing that you can do, that the U in UDP...

Upvotes: -1

lunixbochs
lunixbochs

Reputation: 22415

UDP doesn't verify the target received it (like TCP does) - you must implement retransmission and such in your applications if you want to ensure all of the data arrives. Do you control the sending UDP source?

Upvotes: 4

Related Questions