Swatcat
Swatcat

Reputation: 37

Python Twisted sending large a file across network

I am trying to send a file across the network using Twisted with the LineReceiver protocol. The issue I am seeing is that when I read a binary file and try to send the chunks they simply don't send.

I am reading the file using:

import json
import time
import threading
from twisted.internet import reactor, threads
from twisted.protocols.basic import LineReceiver
from twisted.internet import protocol

MaximumMsgSize = 15500

trySend = True
connectionToServer = None

class ClientInterfaceFactory(protocol.Factory):

    def buildProtocol(self, addr):
        return WoosterInterfaceProtocol(self._msgProcessor, self._logger)

class ClientInterfaceProtocol(LineReceiver):

    def connectionMade(self):
        connectionToServer = self

    def _DecodeMessage(self, rawMsg):
        header, body = json.loads(rawMsg)   
        return (header, json.loads(body))

    def ProcessIncomingMsg(self, rawMsg, connObject):
        # Decode raw message.
        decodedMsg = self._DecodeMessage(rawMsg)

        self.ProccessTransmitJobToNode(decodedMsg, connObject)

    def _BuildMessage(self, id, msgBody = {}):
        msgs = []

        fullMsgBody = json.dumps(msgBody)
        msgBodyLength = len(fullMsgBody)

        totalParts = 1 if msgBodyLength <= MaximumMsgSize else \
            int(math.ceil(msgBodyLength / MaximumMsgSize))

        startPoint = 0
        msgBodyPos = 0

        for partNo in range(totalParts):
            msgBodyPos = (partNo + 1) * MaximumMsgSize

            header = {'ID' : id, 'MsgParts' : totalParts,
                'MsgPart' : partNo }
            msg = (header, fullMsgBody[startPoint:msgBodyPos])
            jsonMsg = json.dumps(msg)       

            msgs.append(jsonMsg)
            startPoint = msgBodyPos

        return (msgs, '')

    def ProccessTransmitJobToNode(self, msg, connection):
        rootDir = '../documentation/configs/Wooster'

        exportedFiles = ['consoleLog.txt', 'blob.dat']
        params = {
            'Status' : 'buildStatus',
            'TaskID' : 'taskID',
            'Name' : 'taskName',
            'Exports' : len(exportedFiles),
            }
        msg, statusStr = self._BuildMessage(101, params)
        connection.sendLine(msg[0])

        for filename in exportedFiles:
            with open (filename, "rb") as exportFileHandle:
                data = exportFileHandle.read().encode('base64')

            params = {
                ExportFileToMaster_Tag.TaskID : taskID,
                ExportFileToMaster_Tag.FileContents : data,
                ExportFileToMaster_Tag.Filename : filename
            }
            msgs, _ = self._BuildMessage(MsgID.ExportFileToMaster, params)          
            for m in msgs: 
                connection.sendLine(m)

    def lineReceived(self, data):
        threads.deferToThread(self.ProcessIncomingMsg, data, self)


def ConnectFailed(reason):
    print 'Connection failed..'
    reactor.callLater(20, reactor.callFromThread, ConnectToServer)

def ConnectToServer():
    print 'Connecting...'
    from twisted.internet.endpoints import TCP4ClientEndpoint
    endpoint = TCP4ClientEndpoint(reactor, 'localhost', 8181)

    deferItem = endpoint.connect(factory)
    deferItem.addErrback(ConnectFailed)

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()

reactor.callFromThread(ConnectToServer)

factory = ClientInterfaceFactory()
protocol = ClientInterfaceProtocol()

while 1:
    time.sleep(0.01)

    if connectionToServer == None: continue

    if trySend == True:
        protocol.ProccessTransmitJobToNode(None, None)
        trySend = False

Is there something I am doing wrong?file is sent, it's when the write is multi part or there are more than one file it struggles.

If a single write occurs then the m Note: I have updated the question with a crude piece of sample code in the hope it makes sense.

Upvotes: 0

Views: 479

Answers (1)

Jean-Paul Calderone
Jean-Paul Calderone

Reputation: 48335

_BuildMessage returns a two-tuple: (msgs, '').

Your network code iterates over this:

msgs = self._BuildMessage(MsgID.ExportFileToMaster, params)

for m in msgs: 

So your network code first tries to send a list of json encoded data and then tries to send the empty string. It most likely raises an exception because you cannot send a list of anything using sendLine. If you aren't seeing the exception, you've forgotten to enable logging. You should always enable logging so you can see any exceptions that occur.

Also, you're using time.sleep and you shouldn't do this in a Twisted-based program. If you're doing this to try to avoid overloading the receiver, you should use TCP's native backpressure instead by registering a producer which can receive pause and resume notifications. Regardless, time.sleep (and your loop over all the data) will block the entire reactor thread and prevent any progress from being made. The consequence is that most of the data will be buffered locally before being sent.

Also, your code calls LineReceiver.sendLine from a non-reactor thread. This has undefined results but you can probably count on it to not work.

This loop runs in the main thread:

while 1:
    time.sleep(0.01)

    if connectionToServer == None: continue

    if trySend == True:
        protocol.ProccessTransmitJobToNode(None, None)
        trySend = False

while the reactor runs in another thread:

netThread = threading.Thread(target=reactor.run, kwargs={"installSignalHandlers": False})
netThread.start()

ProcessTransmitJobToNode simply calls self.sendLine:

def ProccessTransmitJobToNode(self, msg, connection):
    rootDir = '../documentation/configs/Wooster'

    exportedFiles = ['consoleLog.txt', 'blob.dat']
    params = {
        'Status' : 'buildStatus',
        'TaskID' : 'taskID',
        'Name' : 'taskName',
        'Exports' : len(exportedFiles),
        }
    msg, statusStr = self._BuildMessage(101, params)
    connection.sendLine(msg[0])

You should probably remove the use of threading entirely from the application. Time-based events are better managed using reactor.callLater (your main-thread loop effectively generates a call to ProcessTransmitJobToNode once hundred times a second (modulo effects of the trySend flag)).

You may also want to take a look at https://github.com/twisted/tubes as a better way to manage large amounts of data with Twisted.

Upvotes: 2

Related Questions