Reputation: 429
I've been trying to update a nested dictionary using multiprocessing.
I'm able to add the information I want if the dictionary contains a list of elements but if it's a nested dictionary I don't see any changes.
I'm aware that the multiprocessing modules says that it's a dictproxy and not a dict and I've tried changing the example on the module to achieve it but I haven't had any luck.
import socket
import csv
from pprint import pprint
from multiprocessing import Pool,Process,current_process,Manager
def dns_lookup(aggregate,ip):
try:
hostname=socket.gethostbyaddr(ip)[0]
proc_name = current_process().name
#print(str(hostname) + " extracted from ip " + str(ip) + " by process id: " + str(proc_name) )
aggregate[ip]+=hostname
except Exception as e:
pass
if __name__=='__main__':
procs=[]
manager=Manager()
aggregate=manager.dict()
with open("ip_list.csv","r") as ipfile:
csv_reader=csv.reader(ipfile)
for index,row in enumerate(csv_reader):
if index == 0:
pass
else:
aggregate[row[0]]=row[1:]
#ips.append((row[0]))
proc = Process(target=dns_lookup, args=(aggregate,row[0],))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
pprint(dict(aggregate))
The above code works but if I try and change the original dict to
aggregate[row[0]]={'Other Items':row[1:]}
and then try to update it as
d['hostname']=hostname
aggregate[ip]=d
#aggregate[ip]+=d
doesn't have any effect.
I need for the actual list to have a dictionary and not a list of elements.
The current file is small but I will have to scale this up to about 10k lookups so multiprocessing is required.
Any help is much appreciated.
Thanks, Karan
Upvotes: 2
Views: 1190
Reputation: 11593
Yes, it seems that the update on the dict
is not propagated to the other processes. Even migrating the inner dict to manager.dict()
doesn't solve the problem. What works is if you create a new dict from scratch and apply this to aggregate[ip]
:
aggregate[ip] = {"hostname": hostname, "Other Items": aggregate[ip]['Other Items']}
This is maybe a bug, but I would advice you to do bigger changes in your code. There are two shortcomings:
aggregate
both as a "queue" of IP's that still need to be looked up and also as a result container, where the processes write to. If you split this up into a queue and a dict that just holds the results makes you avoid the problem you have: Then, you only read from the queue and you only write into the result container aggregate
.number-of-cores
processes at a time. On Linux you'd waste a lot of memory that is not needed. On Windows you would start 1'000 python programs from scratch. Use a Pool
instead and let python figure out the number of cores and distribute your work across these processes.I have reworked your code into this:
import socket
import csv
from pprint import pprint
from multiprocessing import Pool, Queue, Process, current_process, Manager
from time import sleep
def dns_lookup(aggregate,queue):
while not queue.empty(): # live as long there are items in the queue
row = queue.get()
ip = row[0]
other_items = row[1:]
hostname=socket.gethostbyaddr(ip)[0]
aggregate[ip] = {
"hostname": hostname,
"other items": other_items,
"process_name": current_process().name}
if __name__=='__main__':
procs=[]
manager=Manager()
aggregate=manager.dict()
queue = Queue()
with open("ip_list.csv","r") as ipfile:
csv_reader=csv.reader(ipfile)
next(csv_reader) # instead of the if index == 0; pass
for row in csv_reader: # fill queue before starting any processes
queue.put(row)
# start x processes, where None says to take x = the number of cpus returned by `cpu_count()`
pool = Pool(None, dns_lookup, (aggregate, queue))
pool.close() # signal that we won't submit any more tasks to pool
pool.join() # wait until all processes are done
pprint(dict(aggregate))
Besides: You would be better of using Threads
instead of multiprocessing
, since your processes will be blocked by networking and not CPU. Multiprocessing only makes sense if you can occupy one CPU core to 100%.
Upvotes: 2