Reputation: 15
I'm trying to help someone out with something. I'm by no means an expert programmer but what I'm trying to do is calculate a value from one CSV based on the year and ID from another CSV. The program works as I intended to if I statically put a smaller sample size for time and testing purposes (amount_of_reviews works with a 180mb CSV). But when I want it to work ALL the data I seem to be missing about 2000 from the expected 20245 results (one of the threads fails perhaps?). I am using multiprocessing to reduce the time the program takes to run. I will just go ahead and post all my code here and I hope maybe someone with experience can spot my mistake(s).
import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
from ctypes import c_char_p
print (datetime.datetime.now())
with open('D:/temp/listings.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(0)
idSet = set()
for row in reader:
idSet.add(row[0])
idList = list(idSet)
idList = sorted(idList)
listings = []
def amount_of_reviews_2019(id):
total = 0
with open('D:/temp/reviews.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(0)
next(reader)
for row in reader:
if int(row[2][:4]) >= 2019 and row[0] == id:
total = total + 1
return total
def calc(id):
with open('D:/temp/listings.csv', encoding="utf8") as f:
reader = csv.reader(f)
f.seek(1)
listing = []
for row in reader:
if row[0] == id:
listing.append(row[0])
listing.append(row[48])
listing.append(row[49])
listing.append(amount_of_reviews_2019(id))
listings.append(listing)
print(len(listings))
def format_csv(data, lock):
with lock:
with open('D:/temp/multiprocessing.csv', 'a+', newline='', encoding="utf8") as csvfile:
filewriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
print(data)
filewriter.writerows(data)
#for y in data:
#filewriter.writerow([y[0], y[1], y[2], y[3]])
def do(counter, lock):
for id in idList:
if counter.value < len(idList): #len(idList) = 20245 #When i put lets say 15 here I get all 15 expected results
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
print(counter.value)
calc(idList[counter.value])
else:
format_csv(listings, lock)
break
if __name__ == '__main__':
lock = Lock()
print(len(idList))
sharedCounter = Value('i', 0)
processes = []
for i in range(os.cpu_count()):
print('registering process %d' % i)
processes.append(Process(target=do, args=(sharedCounter, lock)))
for process in processes:
process.start()
for process in processes:
process.join()
print (datetime.datetime.now())
Upvotes: 1
Views: 301
Reputation: 5940
This code looks like it has a race condition:
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
print(counter.value)
calc(idList[counter.value])
You increment the counter
while holding a lock on it, fine. However, then in idList[counter.value]
you query the value of the counter outside the lock. So another thread/process may have changed the counter in the meantime. In that case you will read an unexpected value from the counter. A safe way to write your code would be this:
value = None
with counter.get_lock():
counter.value += 1 #I am aware I skip the 0 index here
value = counter
print(value)
calc(idList[value])
EDIT Here is a version of your code that has all race conditions removed (I believe) and also has the file I/O removed. It works correctly for me. Maybe you can add back the file I/O piece by piece and see where things go wrong
import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
print (datetime.datetime.now())
idSet = set(range(20245))
idList = list(idSet)
idList = sorted(idList)
listings = []
totalCounter = Value('i', 0)
def calc(id):
listing = []
listings.append(listing)
def format_csv(data, lock):
with lock:
totalCounter.value += len(data)
def do(counter, lock):
for id in idList:
value = None
with counter.get_lock():
if counter.value < len(idList):
value = counter.value
counter.value += 1
if value is not None:
calc(idList[value])
else:
format_csv(listings, lock)
break
if __name__ == '__main__':
lock = Lock()
sharedCounter = Value('i', 0)
processes = []
for i in range(os.cpu_count()):
processes.append(Process(target=do, args=(sharedCounter, lock)))
for process in processes:
process.start()
for process in processes:
process.join()
print (datetime.datetime.now())
print('len(idList): %d, total: %d' % (len(idList), totalCounter.value))
Upvotes: 2
Reputation: 2702
Without having given it an in-depth look, I would say there are two main culprits here, and they both go hand-in-hand:
First, there's the repeated file parsing and iteration. You iterate over every ID in the "main loop", so 20,025 times. For each ID, you then read and iterate over the entire listings file (20,051 lines), and the entire reviews file (493,816 lines). That adds up to reading a cool 10 billion 290 million 186 thousand 675 lines of CSV.
Second, there's the multiprocessing itself. I haven't given it an in-depth look, but I think it's fair to say that we can get a good idea of the problem just from the code. As we saw above, for each ID your program opens both CSV files. Having a bunch of processes which all need to write to the same two files, 20,000 times in total, can't be good for performance. I wouldn't be entirely surprised if the code ran faster without the multiprocessing than with it. There's also the potential race condition mentioned by Daniel Junglas.
Alright, it's still a mess, but I just wanted to get something out there before the turn of the century. I will keep searching for a better solution. Based on the number of listings which appear in reviews but not in listings.csv
, amongst other things, the ideal solution might be slightly different.
import numpy as np
import pandas as pd
listings_df = pd.read_csv('../resources/listings.csv', header=0, usecols=['id'], dtype={'id': str})
reviews_df = pd.read_csv('../resources/reviews.csv', header=0, parse_dates=['date'], dtype={'listing_id': str})
valid_reviews = reviews_df[reviews_df['date'] >= pd.Timestamp(year=2019, month=1, day=1)]
review_id_counts = valid_reviews['listing_id'].value_counts()
counts_res: pd.DataFrame = pd.merge(listings_df, review_id_counts, left_on='id', right_index=True, how='left').rename(columns={'listing_id': 'review_count'})
counts_res['review_count'] = counts_res['review_count'].fillna(0).astype(np.int64)
counts_res.to_csv(path_or_buf='../out/listing_review_counts.csv', index=False)
Runtime is around 1s, which means I did beat my target of 5 seconds or less. Yay :)
This method uses a dictionary to count reviews, and the standard csv module. Bear in mind that it will throw an error if a review is for a listing which is not in listings.csv
.
import csv
import datetime
with open('../resources/listings.csv') as listings_file:
reader = csv.DictReader(listings_file)
listing_review_counts = dict.fromkeys((row['id'] for row in reader), 0)
cutoff_date = datetime.date(2019, 1, 1)
with open('../resources/reviews.csv') as reviews_file:
reader = csv.DictReader(reviews_file)
for row in reader:
rev_date = datetime.datetime.fromisoformat(row['date']).date()
if rev_date >= cutoff_date:
listing_review_counts[row['listing_id']] += 1
with open('../out/listing_review_counts_2.csv', 'w', newline='') as out_file:
writer = csv.writer(out_file)
writer.writerow(('id', 'review_count'))
writer.writerows(listing_review_counts.items())
This method uses collections.Counter
and the standard csv module.
import collections as colls
import csv
import datetime
cutoff_date = datetime.date(2019, 1, 1)
with open('../resources/reviews.csv') as reviews_file:
reader = csv.DictReader(reviews_file)
review_listing_counts = colls.Counter(
(row['listing_id'] for row in reader if datetime.datetime.fromisoformat(row['date']).date() >= cutoff_date))
with open('../resources/listings.csv') as listings_file, open('../out/listing_review_counts_3.csv', 'w',
newline='') as out_file:
reader = csv.DictReader(listings_file)
listings_ids = (row['id'] for row in reader)
writer = csv.writer(out_file)
writer.writerow(('id', 'review_count'))
writer.writerows(((curr_id, review_listing_counts[curr_id]) for curr_id in listings_ids))
Let me know if you have any questions, if I should include some explanations, etc. :)
Upvotes: 1
Reputation: 1629
I would suggest using pandas to read the files (thanks Alexander). and then loop through the listings and sum all reviews that have that specific id and are after 2019 :
import numpy as np
import pandas
import datetime
import time
listing_csv_filename = r'listings.csv'
reviews_csv_filename = r'reviews.csv'
start = time.time()
df_listing = pandas.read_csv(listing_csv_filename, delimiter=',', quotechar='"')
df_reviews = pandas.read_csv(reviews_csv_filename, delimiter=',', parse_dates=[1])
values = list()
valid_year = df_reviews['date'] > datetime.datetime(2019, 1, 1, 0, 0, 0)
for id_num in df_listing['id']:
valid = (df_reviews['listing_id'] == id_num) & valid_year
values.append((id_num, np.sum(valid)))
print(values)
print(time.time() - start)
Upvotes: 1