Reputation: 526
I'm trying to start a proccesss as below, but getting the mentioned errors. on the start command.
import pyshark
from multiprocessing import Process
import pandas as pd
def get_data(cap, df):
for packet1 in cap.sniff_continuously(packet_count=100):
print("ssdp")
print(len(packet1))
df.append(len(packet1))
print(df.size)
if __name__ == '__main__':
cap1 = pyshark.LiveCapture(interface='1', display_filter='ssdp')
df_sspd=pd.DataFrame()
sspd_p = Process(target=get_data, args=(cap1,df_sspd))
sspd_p.start()
1st:
File "C:\Users\BMWE\anaconda3\lib\logging\__init__.py", line 1727, in __reduce__
raise pickle.PicklingError('logger cannot be pickled')
PicklingError: logger cannot be pickled
and 2nd:
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "C:\Users\BMWE\anaconda3\lib\multiprocessing\spawn.py", line 116, in spawn_main
exitcode = _main(fd, parent_sentinel)
File "C:\Users\BMWE\anaconda3\lib\multiprocessing\spawn.py", line 126, in _main
self = reduction.pickle.load(from_parent)
EOFError: Ran out of input
Searching the web, it seems that there are some occurrences, but the solution is not clear for me.
Upvotes: 0
Views: 4717
Reputation: 25
The problem for me is that I was sending a running object, so I just instantiate the object inside the target function. Check the same with (cap1 and df_sspd
Upvotes: 0
Reputation: 142859
multiprocessing
uses pickle
to send arguments to processes - but problem is picke
can send normal data but not running object like cap
.
I would rather create cap
directly in functions and eventually send only display_filter
I tested it with two function which do the same and it seems they can work at the same time and both get the same data - so there is no problem with sharing data from interface.
from multiprocessing import Process
import pyshark
def get_data_1():
cap = pyshark.LiveCapture(interface='1', display_filter='ssdp')
data = []
for packet1 in cap.sniff_continuously(packet_count=100):
print("1 >", len(packet1))
data.append(len(packet1))
print(len(data))
def get_data_2():
#cap = pyshark.LiveCapture(interface='1', display_filter='udp.port==12345')
cap = pyshark.LiveCapture(interface='1', display_filter='ssdp')
data = []
for packet1 in cap.sniff_continuously(packet_count=100):
print("2 >", len(packet1))
data.append(len(packet1))
print(len(data))
if __name__ == '__main__':
p1 = Process(target=get_data_1)
p2 = Process(target=get_data_2)
p1.start()
p2.start()
print('Press Ctrl+C to stop')
If function will be similar then I would create one function and run with different arguments
from multiprocessing import Process
import pyshark
def get_data(my_filter, my_count, prefix="1 >"):
cap = pyshark.LiveCapture(interface='1', display_filter=my_filter)
data = []
for packet1 in cap.sniff_continuously(packet_count=my_count):
print(prefix, len(packet1))
data.append(len(packet1))
print(len(data))
if __name__ == '__main__':
p1 = Process(target=get_data, args=('ssdp', 100, '1 >'))
#p2 = Process(target=get_data, args=('udp.port==12345', 100, '2 >'))
p2 = Process(target=get_data, args=('ssdp', 100, '2 >'))
p1.start()
p2.start()
print('Press Ctrl+C to stop')
EDIT:
There is also problem with returning data. I use Queue
for this. Because processes may finish job in different order so I use process_number
to sort data in correct order.
from multiprocessing import Process, Queue
import pyshark
def get_data(process_number, queue, my_filter, my_count, prefix="1 >"):
cap = pyshark.LiveCapture(interface='1', display_filter=my_filter)
data = []
for packet1 in cap.sniff_continuously(packet_count=my_count):
print(prefix, len(packet1))
data.append(len(packet1))
print(len(data))
# return data,
# `process_number` used to set all results in correct order
queue.put([process_number, data])
if __name__ == '__main__':
q = Queue()
p1 = Process(target=get_data, args=(0, q, 'ssdp', 5, '1 >'))
p2 = Process(target=get_data, args=(1, q, 'ssdp', 5, '2 >'))
p1.start()
p2.start()
print('waiting ...')
p1.join()
p2.join()
results = [q.get(), q.get()]
print('--- original order ---')
print(results) # they can be in different order - [[1, ....], [0, ....]]
print('--- sorted ---')
results = sorted(results)
print(results) # they can be in different order - [[1, ....], [0, ....]]
Results
--- original order ---
[[1, [375, 366, 418, 430, 446]], [0, [375, 366, 418, 430, 446]]]
--- sorted ---
[[0, [375, 366, 418, 430, 446]], [1, [375, 366, 418, 430, 446]]]
Or I would use Pool
and it should return results in correct order. And code is simpler because I don't have to use Queue
for manually
from multiprocessing import Pool
import pyshark
def get_data(my_filter, my_count, prefix="1 >"):
cap = pyshark.LiveCapture(interface='1', display_filter=my_filter)
data = []
for packet1 in cap.sniff_continuously(packet_count=my_count):
print(prefix, len(packet1))
data.append(len(packet1))
print(len(data))
return data
if __name__ == '__main__':
pool = Pool(2)
data = [('ssdp', 5, '1 >'), ('ssdp', 5, '2 >')]
results = pool.starmap(get_data, data)
print('waiting ...')
print('results:', results)
Upvotes: 1