hercules.cosmos
hercules.cosmos

Reputation: 275

How to read a file and load to a list in multiprocess

A data file that i need to read is too big, and loading it to a list takes too long time. How can I use multiproces for this? In other words, I would like to parallelise the process file read and load to a list. Could you please help.

Basically, I have a data table, that I need to load to a list, something as below. Reading the file does not take time, but loading it to a list (myList) takes about 1 minute. So, is it possible to parallelise this:

def load_file(self, fileName):    
    time_start = time.time()
    myList = []
    # mySet = set()
    lines = self.read_file(fileName)
    # time_end = time.time()
    # print fileName, ": loaded ",  round(time_end-time_start, 4)," seconds" 
    for line in lines:  
        content = line.split()   
        myList.append(content)
    time_end = time.time()
    print fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds"    
    return myList

def read_file(self, fileName): 
    filePath = self.data_directory + '\\' + fileName     
    try:
        with open(filePath, 'r') as f:
            lines = f.readlines()
            f.close()
            return lines
    except ValueError:
        print filePath + ' does not exist' 

An adhoc way could be, (assume that the file has 2M lines, so len(lines) = 2M), load first 1M to myList1, and second 1M to myList2 in parallel, and then merge them, myList = myList1+myList2. But this doesn't sound like the best practice.

Upvotes: 0

Views: 773

Answers (2)

bpgergo
bpgergo

Reputation: 16037

basically it is never a good idea call file.readlines() on a large file. I'm not sure what this lines does

self.read_file(fileName)

but I'm afraid it calls file.readlines().

Normally you do not want to have millions of lines of a large file in a list. That'll eat up your memory.

If you want to filter/transform the lines of a large file and then write the result lines into an other file, then use iterators instead of loading lines in a list.

I suggest try to organize your solution along these lines. This approach can easily handle files if size several gigabytes.

def split_lines(file):
    with open(file) as f:
        for line in f:                
            yield line.split()

def process_splitted_lines(file):
    for splitted_line in split_lines(file):
        <do some other thing with splitted line>
        yield something

def write_result_lines(file):
    for something in process_splitted_lines(file):
        line = <do some other thing with something>
        <write line to resultfile>

Upvotes: 1

Cilyan
Cilyan

Reputation: 8491

I did some testing, that was fun, HA HA. I do not think this is very efficient :) Maybe there is another efficient way?

import time
import multiprocessing

## Generate sample big file (~158Mo, 2M lines)
import random
chunks = "Lorem ipsum dolor sit amet consectetur adipisicing elit sed do eiusmod tempor incididunt ut labore et dolore magna aliqua".split()
with open(r"D:\testbig.txt", "w", encoding="utf-8") as f:
    for i in range(2000000):
        for nch in range(random.randrange(5,20)):
            f.write(random.choice(chunks))
            f.write(" ")
        f.write("\n")

# Proposed direct way
fileName = "foo"
time_start = time.time()
myList = []
# mySet = set()
with open(r"D:\testbig.txt", "r", encoding="utf-8") as f:
    lines = f.readlines()
time_end = time.time()
print(fileName, ": loaded ",  round(time_end-time_start, 4)," seconds" )
for line in lines:  
    content = line.split()   
    myList.append(content)
time_end = time.time()
print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds")
del myList

# Results:
## foo : loaded  0.9204  seconds
## foo :  2000000  rows loaded in 6.9107  seconds
## Press any key to continue . . .

# Workers method:
MAXPROCESS = 7
CHUNKLEN = 25600000

# The worker
def splitter(lines):
    myList = []
    for line in lines:
        content = line.split()
        myList.append(content)
    return myList

# The code has to be fully loaded, therefore in a function
def main():

    fileName = "foo"
    time_start = time.time()
    # Declare a pool of workers
    pool = multiprocessing.Pool(MAXPROCESS)
    results = []
    with open(r"D:\testbig.txt", "r", encoding="utf-8") as f:
        while True:
            # Read an amount of lines (about CHUNKLEN bytes)
            lines = f.readlines(CHUNKLEN)
            # End of file breaks the loop
            if len(lines) == 0: break
            # Queue data to be processed
            results.append(pool.apply_async(splitter, (lines,)))
    time_end = time.time()
    print(fileName, ": loaded ",  round(time_end-time_start, 4)," seconds" )
    # Wait for queue to be processed
    pool.close()
    pool.join()
    # Put list pieces together
    myList = []
    for result in results:
        myList += result.get()

    time_end = time.time()
    print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds")

main()

# Results:

# MAXPROCESS = 4
# CHUNKLEN = 8192
## foo : loaded  5.0075  seconds
## foo :  2000000  rows loaded in 11.0446  seconds
## Press any key to continue . . .

# MAXPROCESS = 7
# CHUNKLEN = 25600
## foo : loaded  6.0839  seconds
## foo :  2000000  rows loaded in 9.1102  seconds
## Press any key to continue . . .

# MAXPROCESS = 7
# CHUNKLEN = 25600000
## foo : loaded  3.1199  seconds
## foo :  2000000  rows loaded in 11.7622  seconds
## Press any key to continue . . .

Upvotes: 0

Related Questions