Reputation: 275
A data file that i need to read is too big, and loading it to a list takes too long time. How can I use multiproces for this? In other words, I would like to parallelise the process file read and load to a list. Could you please help.
Basically, I have a data table, that I need to load to a list, something as below. Reading the file does not take time, but loading it to a list (myList) takes about 1 minute. So, is it possible to parallelise this:
def load_file(self, fileName):
time_start = time.time()
myList = []
# mySet = set()
lines = self.read_file(fileName)
# time_end = time.time()
# print fileName, ": loaded ", round(time_end-time_start, 4)," seconds"
for line in lines:
content = line.split()
myList.append(content)
time_end = time.time()
print fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds"
return myList
def read_file(self, fileName):
filePath = self.data_directory + '\\' + fileName
try:
with open(filePath, 'r') as f:
lines = f.readlines()
f.close()
return lines
except ValueError:
print filePath + ' does not exist'
An adhoc way could be, (assume that the file has 2M lines, so len(lines) = 2M), load first 1M to myList1, and second 1M to myList2 in parallel, and then merge them, myList = myList1+myList2. But this doesn't sound like the best practice.
Upvotes: 0
Views: 773
Reputation: 16037
basically it is never a good idea call file.readlines() on a large file. I'm not sure what this lines does
self.read_file(fileName)
but I'm afraid it calls file.readlines().
Normally you do not want to have millions of lines of a large file in a list. That'll eat up your memory.
If you want to filter/transform the lines of a large file and then write the result lines into an other file, then use iterators instead of loading lines in a list.
I suggest try to organize your solution along these lines. This approach can easily handle files if size several gigabytes.
def split_lines(file):
with open(file) as f:
for line in f:
yield line.split()
def process_splitted_lines(file):
for splitted_line in split_lines(file):
<do some other thing with splitted line>
yield something
def write_result_lines(file):
for something in process_splitted_lines(file):
line = <do some other thing with something>
<write line to resultfile>
Upvotes: 1
Reputation: 8491
I did some testing, that was fun, HA HA. I do not think this is very efficient :) Maybe there is another efficient way?
import time
import multiprocessing
## Generate sample big file (~158Mo, 2M lines)
import random
chunks = "Lorem ipsum dolor sit amet consectetur adipisicing elit sed do eiusmod tempor incididunt ut labore et dolore magna aliqua".split()
with open(r"D:\testbig.txt", "w", encoding="utf-8") as f:
for i in range(2000000):
for nch in range(random.randrange(5,20)):
f.write(random.choice(chunks))
f.write(" ")
f.write("\n")
# Proposed direct way
fileName = "foo"
time_start = time.time()
myList = []
# mySet = set()
with open(r"D:\testbig.txt", "r", encoding="utf-8") as f:
lines = f.readlines()
time_end = time.time()
print(fileName, ": loaded ", round(time_end-time_start, 4)," seconds" )
for line in lines:
content = line.split()
myList.append(content)
time_end = time.time()
print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds")
del myList
# Results:
## foo : loaded 0.9204 seconds
## foo : 2000000 rows loaded in 6.9107 seconds
## Press any key to continue . . .
# Workers method:
MAXPROCESS = 7
CHUNKLEN = 25600000
# The worker
def splitter(lines):
myList = []
for line in lines:
content = line.split()
myList.append(content)
return myList
# The code has to be fully loaded, therefore in a function
def main():
fileName = "foo"
time_start = time.time()
# Declare a pool of workers
pool = multiprocessing.Pool(MAXPROCESS)
results = []
with open(r"D:\testbig.txt", "r", encoding="utf-8") as f:
while True:
# Read an amount of lines (about CHUNKLEN bytes)
lines = f.readlines(CHUNKLEN)
# End of file breaks the loop
if len(lines) == 0: break
# Queue data to be processed
results.append(pool.apply_async(splitter, (lines,)))
time_end = time.time()
print(fileName, ": loaded ", round(time_end-time_start, 4)," seconds" )
# Wait for queue to be processed
pool.close()
pool.join()
# Put list pieces together
myList = []
for result in results:
myList += result.get()
time_end = time.time()
print(fileName, ": ", len(myList), " rows loaded in", round(time_end-time_start, 4)," seconds")
main()
# Results:
# MAXPROCESS = 4
# CHUNKLEN = 8192
## foo : loaded 5.0075 seconds
## foo : 2000000 rows loaded in 11.0446 seconds
## Press any key to continue . . .
# MAXPROCESS = 7
# CHUNKLEN = 25600
## foo : loaded 6.0839 seconds
## foo : 2000000 rows loaded in 9.1102 seconds
## Press any key to continue . . .
# MAXPROCESS = 7
# CHUNKLEN = 25600000
## foo : loaded 3.1199 seconds
## foo : 2000000 rows loaded in 11.7622 seconds
## Press any key to continue . . .
Upvotes: 0