Karanpreet Singh
Karanpreet Singh

Reputation: 429

Update a nested dict with a Manager from multiprocessor module

I've been trying to update a nested dictionary using multiprocessing.

I'm able to add the information I want if the dictionary contains a list of elements but if it's a nested dictionary I don't see any changes.

I'm aware that the multiprocessing modules says that it's a dictproxy and not a dict and I've tried changing the example on the module to achieve it but I haven't had any luck.

import socket
import csv
from pprint import pprint
from multiprocessing import Pool,Process,current_process,Manager

def dns_lookup(aggregate,ip):
    try:
        hostname=socket.gethostbyaddr(ip)[0]
        proc_name = current_process().name
        #print(str(hostname) + " extracted from ip " + str(ip) + " by process id: " + str(proc_name) )
        aggregate[ip]+=hostname
    except Exception as e:
        pass

if __name__=='__main__':
    procs=[]
    manager=Manager()
    aggregate=manager.dict()
    with open("ip_list.csv","r") as ipfile:
        csv_reader=csv.reader(ipfile)
        for index,row in enumerate(csv_reader):
            if index == 0:
                pass
            else:
                aggregate[row[0]]=row[1:]
                #ips.append((row[0]))
                proc = Process(target=dns_lookup, args=(aggregate,row[0],))
                procs.append(proc)
                proc.start()

    for proc in procs:
        proc.join()
    pprint(dict(aggregate))

The above code works but if I try and change the original dict to

aggregate[row[0]]={'Other Items':row[1:]}

and then try to update it as

d['hostname']=hostname
aggregate[ip]=d
#aggregate[ip]+=d

doesn't have any effect.

I need for the actual list to have a dictionary and not a list of elements.

The current file is small but I will have to scale this up to about 10k lookups so multiprocessing is required.

Any help is much appreciated.

Thanks, Karan

Upvotes: 2

Views: 1190

Answers (1)

hansaplast
hansaplast

Reputation: 11593

Yes, it seems that the update on the dict is not propagated to the other processes. Even migrating the inner dict to manager.dict() doesn't solve the problem. What works is if you create a new dict from scratch and apply this to aggregate[ip]:

aggregate[ip] = {"hostname": hostname, "Other Items": aggregate[ip]['Other Items']}

This is maybe a bug, but I would advice you to do bigger changes in your code. There are two shortcomings:

  1. You use aggregate both as a "queue" of IP's that still need to be looked up and also as a result container, where the processes write to. If you split this up into a queue and a dict that just holds the results makes you avoid the problem you have: Then, you only read from the queue and you only write into the result container aggregate.
  2. If you have 1'000 lines in your csv file, you'd end up with 1000 processes, whereas your computer can only sever number-of-cores processes at a time. On Linux you'd waste a lot of memory that is not needed. On Windows you would start 1'000 python programs from scratch. Use a Pool instead and let python figure out the number of cores and distribute your work across these processes.

I have reworked your code into this:

import socket
import csv
from pprint import pprint
from multiprocessing import Pool, Queue, Process, current_process, Manager
from time import sleep

def dns_lookup(aggregate,queue):
    while not queue.empty(): # live as long there are items in the queue
        row = queue.get()
        ip = row[0]
        other_items = row[1:]
        hostname=socket.gethostbyaddr(ip)[0]
        aggregate[ip] = {
            "hostname": hostname, 
            "other items": other_items,
            "process_name": current_process().name}

if __name__=='__main__':
    procs=[]
    manager=Manager()
    aggregate=manager.dict()
    queue = Queue()
    with open("ip_list.csv","r") as ipfile:
        csv_reader=csv.reader(ipfile)
        next(csv_reader) # instead of the if index == 0; pass

        for row in csv_reader: # fill queue before starting any processes
            queue.put(row)

        # start x processes, where None says to take x = the number of cpus returned by `cpu_count()`
        pool = Pool(None, dns_lookup, (aggregate, queue))
        pool.close() # signal that we won't submit any more tasks to pool
        pool.join() # wait until all processes are done
        pprint(dict(aggregate))

Besides: You would be better of using Threads instead of multiprocessing, since your processes will be blocked by networking and not CPU. Multiprocessing only makes sense if you can occupy one CPU core to 100%.

Upvotes: 2

Related Questions