Swoldier
Swoldier

Reputation: 153

Receiving socket data in one thread, writing the data in another -- python

I'm currently writing a Python program to receive data from either a TCP/UDP socket, and then write the data to a file. Right now, my program is I/O bound by writing each datagram to the file as it comes in (I'm doing this for very large files, so the slowdown is considerable). With that in mind, I've decided that I'd like to trying receiving the data from the socket in one thread, and then write that data in a different thread. So far, I've come up with the following rough draft. At the moment, it only writes a single data chunk (512 bytes) to a file.

f = open("t1.txt","wb")
def write_to_file(data):
    f.write(data)

def recv_data():
    dataChunk, addr = sock.recvfrom(buf) #THIS IS THE DATA THAT GETS WRITTEN
    try:
        w = threading.Thread(target = write_to_file, args = (dataChunk,))
        threads.append(w)
        w.start()
        while(dataChunk):
            sock.settimeout(4)
            dataChunk,addr = sock.recvfrom(buf)
    except socket.timeout:
        print "Timeout"
        sock.close()
        f.close()

threads = []
r = threading.Thread(target=recv_data)
threads.append(r)
r.start()

I imagine I'm doing something wrong, I'm just not sure what the best way to use threading is. Right now, my issue is that I have to supply an argument when I create my thread, but the value of that argument doesn't properly change to reflect the new data chunks that come in. However, if I put the line w=threading.Thread(target=write_to_file, arg=(dataChunk,)) inside the while(dataChunk) loop, wouldn't I be creating a new thread each iteration?

Also, for what it's worth, this is just my small proof-of-concept for using separate receive and write threads. This is not the larger program that should ultimately make use of this concept.

Upvotes: 1

Views: 3290

Answers (1)

MisterMiyagi
MisterMiyagi

Reputation: 52099

You need to have a buffer that the reading thread writes to, and the writing thread reads from. A deque from the collections module is perfect, as it allows append/pop from either side without performance degradation.

So, don't pass dataChunk to your thread(s), but the buffer.

import collections  # for the buffer
import time  # to ease polling
import threading 

def write_to_file(path, buffer, terminate_signal):
    with open(path, 'wb') as out_file:  # close file automatically on exit
      while not terminate_signal.is_set() or buffer:  # go on until end is signaled
        try:
          data = buffer.pop()  # pop from RIGHT end of buffer
        except IndexError:
          time.sleep(0.5)  # wait for new data
        else:
          out_file.write(data)  # write a chunk

def read_from_socket(sock, buffer, terminate_signal):
    sock.settimeout(4)
    try:
      while True:
        data, _ = sock.recvfrom(buf)
        buffer.appendleft(data)  # append to LEFT of buffer
    except socket.timeout:
      print "Timeout"
      terminate_signal.set()  # signal writer that we are done
      sock.close()

buffer = collections.deque()  # buffer for reading/writing
terminate_signal = threading.Event()  # shared signal
threads = [
  threading.Thread(target=read_from_socket, kwargs=dict(
    sock=sock,
    buffer=buffer,
    terminate_signal=terminate_signal
  )),
  threading.Thread(target= write_to_file, kwargs=dict(
    path="t1.txt",
    buffer=buffer,
    terminate_signal=terminate_signal
  ))
]
for t in threads:  # start both threads
  t.start()
for t in threads:  # wait for both threads to finish
  t.join()

Upvotes: 3

Related Questions