user1977050
user1977050

Reputation: 526

multiprocessing errors: logger cannot be pickled, EOFError: Ran out of input

I'm trying to start a proccesss as below, but getting the mentioned errors. on the start command.

import pyshark
from multiprocessing import Process
import pandas as pd

def get_data(cap, df):
    for packet1 in cap.sniff_continuously(packet_count=100):
        print("ssdp")
        print(len(packet1))
        df.append(len(packet1))
        
    print(df.size)
        

if __name__ == '__main__':
    cap1 = pyshark.LiveCapture(interface='1', display_filter='ssdp')
    df_sspd=pd.DataFrame()
    sspd_p = Process(target=get_data, args=(cap1,df_sspd))
    sspd_p.start()    

1st:

  File "C:\Users\BMWE\anaconda3\lib\logging\__init__.py", line 1727, in __reduce__
    raise pickle.PicklingError('logger cannot be pickled')

  PicklingError: logger cannot be pickled

and 2nd:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:\Users\BMWE\anaconda3\lib\multiprocessing\spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:\Users\BMWE\anaconda3\lib\multiprocessing\spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

Searching the web, it seems that there are some occurrences, but the solution is not clear for me.

Upvotes: 0

Views: 4717

Answers (2)

Eli
Eli

Reputation: 25

The problem for me is that I was sending a running object, so I just instantiate the object inside the target function. Check the same with (cap1 and df_sspd

Upvotes: 0

furas
furas

Reputation: 142859

multiprocessing uses pickle to send arguments to processes - but problem is picke can send normal data but not running object like cap.

I would rather create cap directly in functions and eventually send only display_filter

I tested it with two function which do the same and it seems they can work at the same time and both get the same data - so there is no problem with sharing data from interface.

from multiprocessing import Process
import pyshark

def get_data_1():

    cap = pyshark.LiveCapture(interface='1', display_filter='ssdp')

    data = []

    for packet1 in cap.sniff_continuously(packet_count=100):
        print("1 >", len(packet1))
        data.append(len(packet1))
        
    print(len(data))

def get_data_2():

    #cap = pyshark.LiveCapture(interface='1', display_filter='udp.port==12345')    
    cap = pyshark.LiveCapture(interface='1', display_filter='ssdp')

    data = []

    for packet1 in cap.sniff_continuously(packet_count=100):
        print("2 >", len(packet1))
        data.append(len(packet1))
        
    print(len(data))

if __name__ == '__main__':
    p1 = Process(target=get_data_1)
    p2 = Process(target=get_data_2)
    
    p1.start() 
    p2.start()     
    
    print('Press Ctrl+C to stop')

If function will be similar then I would create one function and run with different arguments

from multiprocessing import Process
import pyshark

def get_data(my_filter, my_count, prefix="1 >"):

    cap = pyshark.LiveCapture(interface='1', display_filter=my_filter)

    data = []

    for packet1 in cap.sniff_continuously(packet_count=my_count):
        print(prefix, len(packet1))
        data.append(len(packet1))
        
    print(len(data))

if __name__ == '__main__':
    p1 = Process(target=get_data, args=('ssdp', 100, '1 >'))
    #p2 = Process(target=get_data, args=('udp.port==12345', 100, '2 >'))
    p2 = Process(target=get_data, args=('ssdp', 100, '2 >'))
    
    p1.start() 
    p2.start()     
    
    print('Press Ctrl+C to stop')

EDIT:

There is also problem with returning data. I use Queue for this. Because processes may finish job in different order so I use process_number to sort data in correct order.

from multiprocessing import Process, Queue
import pyshark

def get_data(process_number, queue, my_filter, my_count, prefix="1 >"):

    cap = pyshark.LiveCapture(interface='1', display_filter=my_filter)

    data = []

    for packet1 in cap.sniff_continuously(packet_count=my_count):
        print(prefix, len(packet1))
        data.append(len(packet1))
        
    print(len(data))
    
    # return data, 
    # `process_number` used to set all results in correct order
    queue.put([process_number, data])

if __name__ == '__main__':
    q = Queue()
    
    p1 = Process(target=get_data, args=(0, q, 'ssdp', 5, '1 >'))
    p2 = Process(target=get_data, args=(1, q, 'ssdp', 5, '2 >'))
    
    p1.start() 
    p2.start()     
    
    print('waiting ...')
    
    p1.join()
    p2.join()

    results = [q.get(), q.get()]
    print('--- original order ---')
    print(results) # they can be in different order - [[1, ....], [0, ....]]
    print('--- sorted ---')
    results = sorted(results)
    print(results) # they can be in different order - [[1, ....], [0, ....]]
    

Results

--- original order ---
[[1, [375, 366, 418, 430, 446]], [0, [375, 366, 418, 430, 446]]]
--- sorted ---
[[0, [375, 366, 418, 430, 446]], [1, [375, 366, 418, 430, 446]]]

Or I would use Pool and it should return results in correct order. And code is simpler because I don't have to use Queue for manually

from multiprocessing import Pool
import pyshark

def get_data(my_filter, my_count, prefix="1 >"):
    
    cap = pyshark.LiveCapture(interface='1', display_filter=my_filter)

    data = []

    for packet1 in cap.sniff_continuously(packet_count=my_count):
        print(prefix, len(packet1))
        data.append(len(packet1))
        
    print(len(data))
    
    return data

if __name__ == '__main__':

    pool = Pool(2)
    
    data = [('ssdp', 5, '1 >'), ('ssdp', 5, '2 >')]
    
    results = pool.starmap(get_data, data)
    
    print('waiting ...')
    
    print('results:', results)
    

Upvotes: 1

Related Questions