Reputation: 91
I'm currently developing on a python 3.4 network streaming app. And i have some crazy behavior with my socket. (Target 3.3 compatible if possible)
Definition: When i talk of Stream an UDP-Stream is meant.
The problem
While sending the socket.send operation sometimes start take 1-3ms, as i will describe more below the transfer target is much higher. I found other threads here telling about problems with speed, but they handled to send 200k packages a second, but they only send "A". In my case each packet is 1500 Bytes inc. UDP and IP header added by socket. Please see my explains below if the problem not is clear at this point.
Question
Does anyone have an idea why this delays? Or how to speed up sending to reach real time?
def _transfer(self):
self.total_num_samps_sent = 0
self.sequence_out = 0
self.send_in_progress = True
send = self.udp_socket.send
for i in range(0, len(streams), 1):
stream_data, stream_samps, stream_seq = self.packed_streams[i]
# commit the samples
start_try_send_time = monotonic()
while not self.ready():
if monotonic() - start_try_send_time > self.timeout > 0:
# timeout; if timeout == 0 wait endless
return 0
self.sequence_out = stream_seq
# ######################
# Here is the bottleneck
# ######################
s = monotonic()
send(stream_data)
e = monotonic()
if e-s > 0:
print(str(i) + ': ' + str(e-s))
# #####################
# end measure monotonic
# #####################
self.total_num_samps_sent += stream_samps
self.send_in_progress = False
self.packed_streams contains a list of tuples (data_in_bytes(), number_samples_in_this_stream, sequence_out) the function self.ready() returns True when the targed ACK'ed enough packets send (has free RAM).
The special marked bottleneck is more detailed profiled: see it a little more down
self.target = (str(self.ip_target), port)
self.udp_socket = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.udp_socket.settimeout(self.socket_timeout)
try:
self.udp_socket.bind((str(self.ip_own), 0))
except OSError as os_error:
error = ('OS Error: {0}'.format(os_error)
+ linesep + 'IP src: ' + str(self.ip_own)
+ linesep + 'IP dst: ' + str(self.ip_usrp)
+ linesep + 'Port: {0}'.format(port))
exit(error)
self.udp_socket.connect(self.target)
# not helps to set to non blocking
# self.udp_socket.setblocking(False)
The sendfunction (1st code block) runs as seperate thread. And the UDPFlowControl spawns another thread too. Running on same socket as the send streamer (the Streamer inherits the FlowControl and uses its ready state)
def _worker(self):
"""
* Receive Loop
* - update flow control condition count
* - put async message packets into queue
"""
self.send_here_am_i()
while 1:
ready = select([self.udp_socket], [], [], self.socket_timeout)
if ready[0]:
try:
data_in = self.udp_socket.recv(2048)
except:
# ignore timeout/error buffers
continue
# with suppress(Exception): #ToDo Reenable after test is done
bytes_in = len(data_in)
self.data_received += bytes_in
# extract the vrt header packet info
vrt = VRTImplementation()
vrt.num_packet_words32 = int(bytes_in / ctypes.sizeof(ctypes.c_uint32))
if not vrt.unpack_header(data_in, VRTEndian.BIG_ENDIAN):
continue
# handle a tx async report message
if vrt.stream_id32 == Defaults.ASYNC_SID and vrt.packet_type != PacketType.DATA:
# fill in the async metadata
metadata = MetadataAsync()
metadata.load_from_vrt(vrt, data_in[vrt.num_header_words32 * 4:],
self.tick_rate)
# catch the flow control packets and react
if metadata.event_code == EventCode.FLOW_CONTROL:
self.sequence_in = \
unpack('>I', data_in[vrt.num_header_words32 * 4 + 4:vrt.num_header_words32 * 4 + 8])[0]
continue
self.async_msg_fifo.append(metadata)
else:
# TODO: unknown packet
pass
def ready(self):
"""
Check if less ack are outstanding than max allowed
:returns bool: if device can get more data
"""
return self.sequence_out - self.sequence_in < self.max_sequence_out
<< Removed old benchmark >> see history if need this information again!
As mentioned above the monotonic profiling is the reason of my question. As you see times of 0 are ignored. The output looks like this: (The stream contains data of 5 seconds (2754,8 bytestreams to send) with resulting size (wireshark) of 1500 Bytes each
Send: 445.40K of 5.00M, Sending: True @ monotonic time: 44927.0550
1227: 0.01599999999598367
1499: 0.01599999999598367
1740: 0.014999999999417923
1883: 0.01600000000325963
Send: 724.18K of 5.00M, Sending: True @ monotonic time: 44927.3200
....
First number is the index of delayed packed. The 2nd number is the diff time monotonic of this delay. Not shown here but in my log i found timings like 7582: 0.030999999995401595 and sometimes much heigher at 0.06...
The lines starting with Send are the Main Thread writing the current state to console. After writing it goes sleep 250ms.
My problem is currently the system only runs at 1/25 of target speed and already started this hickups as you see in cProfile this takes nearly 30 seconds to send a 5 second stream. Target speed would be 68870P/s @ each 1500Bytes which is ~98,5MByte containing overhead @ GbE => 125MByte/s limit.
This is single target application. And normally attached directly to device over network-wire without any router,switch, whatever. So the network belongs to only this app and device.
What i have done so far:
In all tests keep in mind, the print command is only there to debug. Half of monotonic calls go to debug purpose too.
<< Removed old benchmark >> see history if need this information again!
Running on Windows 7 x64 with Python 3.4.2. @ Corei7 2630QM and 8GB RAM
<< Removed old benchmark >> see history if need this information again!
First, because I can answer it fast cProfile runs inside Thread, the _worker still is an unprofiled 2nd Thread because of low time used in waiting to be ready (~0.05 in sum) i guessed it runs fast enough. The _send function is thread entrance, and more a wrapper to be able to cProfile this Thread.
def _send(self):
profile = cProfile.Profile()
profile.enable()
self._transfer()
profile.disable()
profile.print_stats()
Disable the Timeouts and rerun the profiling need wait 1 or 2 days i am currently cleaning up code because there still left threads in background stay in suspended state with (250ms sleeps) i think it's not a problem to let them die and respawn on usage. When this is done i will retry tests. More I think about GIL is the evil here. Possible it's the process of unpacking incomming packages within flow control and the switching between threads what can take some time and cause this hickups. (If i understand GIL correct - only one thread can execute python code at once, but i wonder why this always hits the socket action, and not split up the ready and send call in a more equal way like 40/60-50/50) So there is the futures pack on my todo list to get real multi core use with Processes. To test this out I will set the return of ready to permanent be True and the FlowControl Thread to not start or return in 1st command.
And target of this Programm is to run on Linux, Windows, Mac and Unix.
First about Threads - they have no priority as mentioned here: Controlling scheduling priority of python threads? I believe there is no way to change it. The core Python runs on is at 25% max. The overall system load is around 10% when debugger runs.
The run with select was only a test. I removed the select code in send routine and tested with and without timeouts:
<< Removed old benchmark >> see history if need this information again!
In this example i killed all threads instead of send them sleep. And the Main thread sleeps more time. Without FlowControl @ 5M
41331 function calls in 2.935 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 2.007 2.007 2.935 2.935 SendStreamer.py:297(_transfer)
13776 0.005 0.000 0.005 0.000 UDPFlowControl.py:52(ready)
1 0.000 0.000 0.000 0.000 {built-in method len}
13776 0.007 0.000 0.007 0.000 {built-in method monotonic}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
13776 0.915 0.000 0.915 0.000 {method 'send' of '_socket.socket' objects}
Here it stays more time in waiting the device than in send.
68873 function calls in 5.245 seconds
Ordered by: standard name
ncalls tottime percall cumtime percall filename:lineno(function)
1 4.210 4.210 5.245 5.245 SendStreamer.py:297(_transfer)
27547 0.030 0.000 0.030 0.000 UDPFlowControl.py:52(ready)
1 0.000 0.000 0.000 0.000 {built-in method len}
27547 0.011 0.000 0.011 0.000 {built-in method monotonic}
1 0.000 0.000 0.000 0.000 {method 'disable' of '_lsprof.Profiler' objects}
13776 0.993 0.000 0.993 0.000 {method 'send' of '_socket.socket' objects}
Still open: split up into processes. - Still refactoring the class structures towards process usage (I think latest end of may I have some new results to add). During some more detailed benchmark I found out that the 2nd thread (unpack of VRT) takes nearly the time of each hickups duration. With processes this should no more be a possible reason to the slowdowns.
I hope there is all information required, if i forgot some please ask!
[Edit1] Added Informations in what i have done list
[Edit2] Added cProfiles of 2nd test system (Manjaro)
[Edit3] Added Informations about how cProfile runs.
[Edit4] More cProfiles + Answer about threads
[Edit5] Removed old benchmarks
Upvotes: 9
Views: 2989
Reputation: 11781
I can confirm this on Linux ran as unprivileged user, python2.
I don't think there's much you can do:
# timing code:
In [16]: @contextlib.contextmanager
....: def timeit():
....: st = time.time()
....: yield
....: en = time.time()
....: b = int(math.log10(en - st))
....: data.setdefault(b, 0)
....: data[b] += 1
# Thus, timing data means:
-6: number of times send took between 0.00000011 and 0.000001s
-4: 0.0000011 ~ 0.00001
-4: 0.000011 ~ 0.0001
-3: 0.00011 ~ 0.001 (up to millisecond)
-2: 0.0011 ~ 0.01 (1..10ms)
# Regular blocking socket
{-6: 2807, -5: 992126, -4: 5049, -3: 18}
# Non-blocking socket
{-6: 3242, -5: 991767, -4: 4970, -3: 20, -2: 1}
# socket with timeout=0
{-6: 2249, -5: 992994, -4: 4749, -3: 8}
# socket with timeout=1
{-5: 994259, -4: 5727, -3: 8, -2: 6}
It looks like tail of this distribution is exponential.
I also larger send buffer and adding occasional time.sleep
to give kernel time to send our queued packet, and that didn't help. Makes sense since non-blocking also gets occasional slow send.
I also tried waiting explicitly for send queue to be empty, per http://www.pycopia.net/_modules/pycopia/socket.html outq
function, and that didn't change the distribution either.
Upvotes: 2