imagine93
imagine93

Reputation: 15

Multiprocessing big CSV's does not return expected amount of rows

I'm trying to help someone out with something. I'm by no means an expert programmer but what I'm trying to do is calculate a value from one CSV based on the year and ID from another CSV. The program works as I intended to if I statically put a smaller sample size for time and testing purposes (amount_of_reviews works with a 180mb CSV). But when I want it to work ALL the data I seem to be missing about 2000 from the expected 20245 results (one of the threads fails perhaps?). I am using multiprocessing to reduce the time the program takes to run. I will just go ahead and post all my code here and I hope maybe someone with experience can spot my mistake(s).

import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime
from ctypes import c_char_p

print (datetime.datetime.now())

with open('D:/temp/listings.csv', encoding="utf8") as f:
    reader = csv.reader(f)
    f.seek(0)

    idSet = set()
    for row in reader:
        idSet.add(row[0])

idList = list(idSet)
idList = sorted(idList)
listings = []

def amount_of_reviews_2019(id):
    total = 0
    with open('D:/temp/reviews.csv', encoding="utf8") as f:
        reader = csv.reader(f)
        f.seek(0)
        next(reader)

        for row in reader:
            if int(row[2][:4]) >= 2019 and row[0] == id:
                total = total + 1
        return total


def calc(id):
    with open('D:/temp/listings.csv', encoding="utf8") as f:
        reader = csv.reader(f)
        f.seek(1)
        listing = []
        for row in reader:
            if row[0] == id: 
                listing.append(row[0])
                listing.append(row[48])
                listing.append(row[49])
                listing.append(amount_of_reviews_2019(id))
        listings.append(listing)
        print(len(listings))

def format_csv(data, lock):
    with lock:
        with open('D:/temp/multiprocessing.csv', 'a+', newline='', encoding="utf8") as csvfile:
            filewriter = csv.writer(csvfile, delimiter=',', quotechar='|', quoting=csv.QUOTE_MINIMAL)
            print(data)
            filewriter.writerows(data)
            #for y in data:
                #filewriter.writerow([y[0], y[1], y[2], y[3]])


def do(counter, lock):
    for id in idList:
        if counter.value < len(idList): #len(idList) = 20245 #When i put lets say 15 here I get all 15 expected results
            with counter.get_lock():
                counter.value += 1 #I am aware I skip the 0 index here
                print(counter.value)
            calc(idList[counter.value])
        else:
            format_csv(listings, lock)
            break

if __name__ == '__main__':
    lock = Lock()

    print(len(idList))
    sharedCounter = Value('i', 0)

    processes = []
    for i in range(os.cpu_count()):
        print('registering process %d' % i)
        processes.append(Process(target=do, args=(sharedCounter, lock)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    print (datetime.datetime.now())

Upvotes: 1

Views: 301

Answers (3)

Daniel Junglas
Daniel Junglas

Reputation: 5940

This code looks like it has a race condition:

    with counter.get_lock():
        counter.value += 1 #I am aware I skip the 0 index here
        print(counter.value)
    calc(idList[counter.value])

You increment the counter while holding a lock on it, fine. However, then in idList[counter.value] you query the value of the counter outside the lock. So another thread/process may have changed the counter in the meantime. In that case you will read an unexpected value from the counter. A safe way to write your code would be this:

    value = None
    with counter.get_lock():
        counter.value += 1 #I am aware I skip the 0 index here
        value = counter
    print(value)
    calc(idList[value])

EDIT Here is a version of your code that has all race conditions removed (I believe) and also has the file I/O removed. It works correctly for me. Maybe you can add back the file I/O piece by piece and see where things go wrong

import csv
import os
from multiprocessing import Process, Lock, Array, Value
import datetime

print (datetime.datetime.now())

idSet = set(range(20245))
idList = list(idSet)
idList = sorted(idList)
listings = []

totalCounter = Value('i', 0)

def calc(id):
    listing = []
    listings.append(listing)

def format_csv(data, lock):
    with lock:
        totalCounter.value += len(data)

def do(counter, lock):
    for id in idList:
        value = None
        with counter.get_lock():
            if counter.value < len(idList):
                value = counter.value
                counter.value += 1
        if value is not None:
            calc(idList[value])
        else:
            format_csv(listings, lock)
            break

if __name__ == '__main__':
    lock = Lock()

    sharedCounter = Value('i', 0)

    processes = []
    for i in range(os.cpu_count()):
        processes.append(Process(target=do, args=(sharedCounter, lock)))

    for process in processes:
        process.start()

    for process in processes:
        process.join()

    print (datetime.datetime.now())
    print('len(idList): %d, total: %d' % (len(idList), totalCounter.value))

Upvotes: 2

AMC
AMC

Reputation: 2702

Diagnosis

Without having given it an in-depth look, I would say there are two main culprits here, and they both go hand-in-hand:

First, there's the repeated file parsing and iteration. You iterate over every ID in the "main loop", so 20,025 times. For each ID, you then read and iterate over the entire listings file (20,051 lines), and the entire reviews file (493,816 lines). That adds up to reading a cool 10 billion 290 million 186 thousand 675 lines of CSV.

Second, there's the multiprocessing itself. I haven't given it an in-depth look, but I think it's fair to say that we can get a good idea of the problem just from the code. As we saw above, for each ID your program opens both CSV files. Having a bunch of processes which all need to write to the same two files, 20,000 times in total, can't be good for performance. I wouldn't be entirely surprised if the code ran faster without the multiprocessing than with it. There's also the potential race condition mentioned by Daniel Junglas.


Solutions

1.

Alright, it's still a mess, but I just wanted to get something out there before the turn of the century. I will keep searching for a better solution. Based on the number of listings which appear in reviews but not in listings.csv, amongst other things, the ideal solution might be slightly different.

import numpy as np
import pandas as pd

listings_df = pd.read_csv('../resources/listings.csv', header=0, usecols=['id'], dtype={'id': str})

reviews_df = pd.read_csv('../resources/reviews.csv', header=0, parse_dates=['date'], dtype={'listing_id': str})

valid_reviews = reviews_df[reviews_df['date'] >= pd.Timestamp(year=2019, month=1, day=1)]

review_id_counts = valid_reviews['listing_id'].value_counts()

counts_res: pd.DataFrame = pd.merge(listings_df, review_id_counts, left_on='id', right_index=True, how='left').rename(columns={'listing_id': 'review_count'})
counts_res['review_count'] = counts_res['review_count'].fillna(0).astype(np.int64)

counts_res.to_csv(path_or_buf='../out/listing_review_counts.csv', index=False)

Runtime is around 1s, which means I did beat my target of 5 seconds or less. Yay :)

2.

This method uses a dictionary to count reviews, and the standard csv module. Bear in mind that it will throw an error if a review is for a listing which is not in listings.csv.

import csv
import datetime

with open('../resources/listings.csv') as listings_file:
    reader = csv.DictReader(listings_file)
    listing_review_counts = dict.fromkeys((row['id'] for row in reader), 0)

cutoff_date = datetime.date(2019, 1, 1)

with open('../resources/reviews.csv') as reviews_file:
    reader = csv.DictReader(reviews_file)
    for row in reader:
        rev_date = datetime.datetime.fromisoformat(row['date']).date()
        if rev_date >= cutoff_date:
            listing_review_counts[row['listing_id']] += 1

with open('../out/listing_review_counts_2.csv', 'w', newline='') as out_file:
    writer = csv.writer(out_file)
    writer.writerow(('id', 'review_count'))
    writer.writerows(listing_review_counts.items())

3.

This method uses collections.Counter and the standard csv module.

import collections as colls
import csv
import datetime

cutoff_date = datetime.date(2019, 1, 1)

with open('../resources/reviews.csv') as reviews_file:
    reader = csv.DictReader(reviews_file)
    review_listing_counts = colls.Counter(
        (row['listing_id'] for row in reader if datetime.datetime.fromisoformat(row['date']).date() >= cutoff_date))

with open('../resources/listings.csv') as listings_file, open('../out/listing_review_counts_3.csv', 'w',
                                                              newline='') as out_file:
    reader = csv.DictReader(listings_file)
    listings_ids = (row['id'] for row in reader)

    writer = csv.writer(out_file)
    writer.writerow(('id', 'review_count'))
    writer.writerows(((curr_id, review_listing_counts[curr_id]) for curr_id in listings_ids))

Let me know if you have any questions, if I should include some explanations, etc. :)

Upvotes: 1

Marc
Marc

Reputation: 1629

I would suggest using pandas to read the files (thanks Alexander). and then loop through the listings and sum all reviews that have that specific id and are after 2019 :

import numpy as np
import pandas
import datetime
import time

listing_csv_filename = r'listings.csv'
reviews_csv_filename = r'reviews.csv'
start = time.time()
df_listing = pandas.read_csv(listing_csv_filename, delimiter=',', quotechar='"')
df_reviews = pandas.read_csv(reviews_csv_filename, delimiter=',', parse_dates=[1])
values = list()
valid_year = df_reviews['date'] > datetime.datetime(2019, 1, 1, 0, 0, 0)
for id_num in df_listing['id']:
    valid = (df_reviews['listing_id'] == id_num) & valid_year
    values.append((id_num, np.sum(valid)))

print(values)
print(time.time() - start)

Upvotes: 1

Related Questions