Reputation: 1112
I am currently fiddling around with Python when my boss assigned me with a quite daunting task.
He gave me a CSV file with around 14GB in size, and ask me if I can inflate that CSV to a delimited file with 4TB of size, by replicating itself several times.
For example, take this CSV:
TIME_SK,ACCOUNT_NUMBER,ACCOUNT_TYPE_SK,ACCOUNT_STATUS_SK,CURRENCY_SK,GLACC_BUSINESS_NAME,PRODUCT_SK,PRODUCT_TERM_SK,NORMAL_BAL,SPECIAL_BAL,FINAL_MOV_YTD_BAL,NO_OF_DAYS_MTD,NO_OF_DAYS_YTD,BANK_FLAG,MEASURE_ID,SOURCE_SYSTEM_ID 20150131,F290006G93996,7,1,12,DEPOSIT INSURANCE EXPENSE,502,0,865.57767676670005,0,865.57767676670005,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150131,F2900F036FF90,7,9,12,GWM BALANCE,502,0,-139.0556,0,-139.0556,30,121,N,GWM BALANCE,1 20150131,F070007GG6790,7,1,12,DEPOSIT INSURANCE EXPENSE,1008,0,14100.016698793699,0,14100.016698793699,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150131,F2F00040FG982,7,1,12,DEPOSIT INSURANCE EXPENSE,502,0,8410.4009848750993,0,8410.4009848750993,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150131,FF30009944863,7,9,12,ACCOUNT PRINCIPAL,502,0,-2367.9400000000001,0,-2367.9400000000001,30,121,N,GL BALANCE,1 20150131,F240002FG722F,7,1,12,ACCOUNT PRINCIPAL,502,0,-28978292.390000001,0,-28978292.390000001,30,121,N,GL BALANCE,1 20150131,F0G00FFF74293,7,1,12,ACCOUNT PRINCIPAL,1008,0,-855196.81000000006,0,-855196.81000000006,30,121,N,GL BALANCE,1 20150131,FF20007947687,7,9,12,GWM BALANCE,2425,0,-368.45897600000001,0,-368.45897600000001,30,121,N,GWM BALANCE,1 20150131,F200007938744,7,1,12,GWM BALANCE,502,0,-19977.173964000001,0,-19977.173964000001,30,121,N,GWM BALANCE,1
He wants me to inflate the size by replicating the contents of the CSV, with altering the TIME_SK
header, like below:
TIME_SK,ACCOUNT_NUMBER,ACCOUNT_TYPE_SK,ACCOUNT_STATUS_SK,CURRENCY_SK,GLACC_BUSINESS_NAME,PRODUCT_SK,PRODUCT_TERM_SK,NORMAL_BAL,SPECIAL_BAL,FINAL_MOV_YTD_BAL,NO_OF_DAYS_MTD,NO_OF_DAYS_YTD,BANK_FLAG,MEASURE_ID,SOURCE_SYSTEM_ID 20150131,F290006G93996,7,1,12,DEPOSIT INSURANCE EXPENSE,502,0,865.57767676670005,0,865.57767676670005,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150131,F2900F036FF90,7,9,12,GWM BALANCE,502,0,-139.0556,0,-139.0556,30,121,N,GWM BALANCE,1 20150131,F070007GG6790,7,1,12,DEPOSIT INSURANCE EXPENSE,1008,0,14100.016698793699,0,14100.016698793699,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150131,F2F00040FG982,7,1,12,DEPOSIT INSURANCE EXPENSE,502,0,8410.4009848750993,0,8410.4009848750993,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150131,FF30009944863,7,9,12,ACCOUNT PRINCIPAL,502,0,-2367.9400000000001,0,-2367.9400000000001,30,121,N,GL BALANCE,1 20150131,F240002FG722F,7,1,12,ACCOUNT PRINCIPAL,502,0,-28978292.390000001,0,-28978292.390000001,30,121,N,GL BALANCE,1 20150131,F0G00FFF74293,7,1,12,ACCOUNT PRINCIPAL,1008,0,-855196.81000000006,0,-855196.81000000006,30,121,N,GL BALANCE,1 20150131,FF20007947687,7,9,12,GWM BALANCE,2425,0,-368.45897600000001,0,-368.45897600000001,30,121,N,GWM BALANCE,1 20150131,F200007938744,7,1,12,GWM BALANCE,502,0,-19977.173964000001,0,-19977.173964000001,30,121,N,GWM BALANCE,1 20150201,F290006G93996,7,1,12,DEPOSIT INSURANCE EXPENSE,502,0,865.57767676670005,0,865.57767676670005,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150201,F2900F036FF90,7,9,12,GWM BALANCE,502,0,-139.0556,0,-139.0556,30,121,N,GWM BALANCE,1 20150201,F070007GG6790,7,1,12,DEPOSIT INSURANCE EXPENSE,1008,0,14100.016698793699,0,14100.016698793699,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150201,F2F00040FG982,7,1,12,DEPOSIT INSURANCE EXPENSE,502,0,8410.4009848750993,0,8410.4009848750993,30,121,N,DEPOSIT INSURANCE EXPENSE,1 20150201,FF30009944863,7,9,12,ACCOUNT PRINCIPAL,502,0,-2367.9400000000001,0,-2367.9400000000001,30,121,N,GL BALANCE,1 20150201,F240002FG722F,7,1,12,ACCOUNT PRINCIPAL,502,0,-28978292.390000001,0,-28978292.390000001,30,121,N,GL BALANCE,1 20150201,F0G00FFF74293,7,1,12,ACCOUNT PRINCIPAL,1008,0,-855196.81000000006,0,-855196.81000000006,30,121,N,GL BALANCE,1 20150201,FF20007947687,7,9,12,GWM BALANCE,2425,0,-368.45897600000001,0,-368.45897600000001,30,121,N,GWM BALANCE,1 20150201,F200007938744,7,1,12,GWM BALANCE,502,0,-19977.173964000001,0,-19977.173964000001,30,121,N,GWM BALANCE,1
and so on.
I was able to make the Python script to do the task, however when used on the real CSV file with tens of Gigabytes in size and hundred millions of row, the task was proved to be too long to complete (there was a time constraint at that time; however, he asked me to do it again now).
I am using the Python built in CSV Writer. After a bit of research, I came up with two different approach:
This is the first version of my script; it does the job all right, however it took too long for tackling the humongous CSV.
. . . omitted . . .
with open('../csv/DAILY_DDMAST.csv', 'rb') as csvinput:
with open('../result/DAILY_DDMAST_result1'+name_interval+'.csv', 'wb') as csvoutput:
reader = csv.reader(csvinput)
writer = csv.writer(csvoutput, lineterminator='\r\n')
# This part copies the original CSV to a new file
for row in reader:
writer.writerow(row)
print("Done copying. Time elapsed: %s seconds, Total time: %s seconds" %
((time.time() - start_time), (time.time() - start_time)))
i = 0
while i < 5:
# This part replicates the content of CSV, with modifying the TIME_SK value
counter_time = time.time()
for row in reader:
newdate = datetime.datetime.strptime(row[0], "%Y%m%d") + datetime.timedelta(days=i)
row[0] = newdate.strftime("%Y%m%d")
writer.writerow(row)
csvinput.seek(0)
next(reader, None)
print("Done processing for i = %d. Time elapsed: %s seconds, Total time: %s seconds" %
(i+1, (counter_time - start_time), (time.time() - start_time)))
i += 1
. . . omitted . . .
In my understanding, the script will iterate each row inside the CSV by for row in reader
, and then write each row to the new file with writer.writerow(row)
. I also found that by iterating the source file, it is a bit repetitive and time consuming, so I thought it could have been more efficient with other approach...
This was intended as an "upgrade" to the first version of the script.
. . . omitted . . .
with open('../csv/DAILY_DDMAST.csv', 'rb') as csvinput:
with open('../result/DAILY_DDMAST_result2'+name_interval+'.csv', 'wb') as csvoutput:
reader = csv.reader(csvinput)
writer = csv.writer(csvoutput, lineterminator='\r\n')
csv_buffer = list()
for row in reader:
# Here, rather than directly writing the iterated row, I stored it in a list.
# If the list reached 1 mio rows, then it writes to the file and empty the "bucket"
csv_buffer.append(row)
if len(csv_buffer) > 1000000:
writer.writerows(csv_buffer)
del csv_buffer[:]
writer.writerows(csv_buffer)
print("Done copying. Time elapsed: %s seconds, Total time: %s seconds" %
((time.time() - start_time), (time.time() - start_time)))
i = 0
while i < 5:
counter_time = time.time()
del csv_buffer[:]
for row in reader:
newdate = datetime.datetime.strptime(row[0], "%Y%m%d") + datetime.timedelta(days=i)
row[0] = newdate.strftime("%Y%m%d")
# Same goes here
csv_buffer.append(row)
if len(csv_buffer) > 1000000:
writer.writerows(csv_buffer)
del csv_buffer[:]
writer.writerows(csv_buffer)
csvinput.seek(0)
next(reader, None)
print("Done processing for i = %d. Time elapsed: %s seconds, Total time: %s seconds" %
(i+1, (counter_time - start_time), (time.time() - start_time)))
i += 1
. . . omitted . . .
I thought, by storing it in memory then writing them altogether with writerows
, I could've saved time. However, that was not the case. I found out that even if I store the rows to be written to the new CSV, writerows
iterates the list then write them to the new file, thus it consumes nearly as long as the first script...
At this point, I don't know if I should come up with better algorithm or there is something that I could use - something like the writerows
, only it does not iterate, but writes them all at once.
I don't know if such thing is possible or not, either
Anyway, I need help on this, and if anyone could shed some lights, I would be very thankful!
Upvotes: 2
Views: 3702
Reputation: 133975
First of all, you're going to be limited by the write speed. Typical write speed for a desktop machine is on the order of about 40 seconds per gigabyte. You need to write 4,000 gigabytes, so it's going to take on the order of 160,000 seconds (44.5 hours) just to write the output. The only way to reduce that time is to get a faster drive.
To make a 4 TB file by replicating a 14 GB file, you have to copy the original file 286 (actually 285.71) times. The simplest way to do things is:
open output file
starting_date = date on first transaction
for pass = 1 to 286
open original file
while not end of file
read transaction
replace date
write to output
increment date
end while
end for
close output file
But with a typical read speed of about 20 seconds per gigabyte, that's 80,000 seconds (22 hours and 15 minutes) just for reading.
You can't do anything about the writing time, but you probably can reduce the reading time by a lot.
If you can buffer the whole 14 GB input file, then reading time becomes about five minutes.
If you don't have the memory to hold the 14 GB, consider reading it into a compressed memory stream. That CSV should compress quite well--to less than half of its current size. Then, rather than opening the input file every time through the loop, you just re-initialize a stream reader from the compressed copy of the file you're holding in memory.
In C#, I'd just use the MemoryStream
and GZipStream
classes. A quick Google search indicates that similar capabilities exist in python, but since I'm not a python programmer I can't tell you exactly how to use them.
Upvotes: 2
Reputation: 6963
Batch writing your rows isn't really going to be an improvement because your write IO's are still going to be the same size. Batching up writes only gives you an improvement if you can increase your IO size, which reduces the number of system calls and allows the IO system to deal with fewer but larger writes.
Honestly, I wouldn't complicate the code with batch writing for maintainability reasons, but I can certainly understand the desire to experiment with trying to improve the speed, if only for educational reasons.
What you want to do is batch up your writes -- batching up your csv rows doesn't really accomplish this.
[Example using StringIO
removed .. there's a better way.]
Python write()
uses buffered I/O. It just by default buffers at 4k (on Linux). If you open the file with a buffering
parameter you can make it bigger:
with open("/tmp/x", "w", 1024*1024) as fd:
for i in range(0, 1000000):
fd.write("line %d\n" %i)
Then your writes will be 1MB. strace
output:
write(3, "line 0\nline 1\nline 2\nline 3\nline"..., 1048576) = 1048576
write(3, "ine 96335\nline 96336\nline 96337\n"..., 1048576) = 1048576
write(3, "1\nline 184022\nline 184023\nline 1"..., 1048576) = 1048576
write(3, "ne 271403\nline 271404\nline 27140"..., 1048576) = 1048576
write(3, "58784\nline 358785\nline 358786\nli"..., 1048576) = 1048576
write(3, "5\nline 446166\nline 446167\nline 4"..., 1048576) = 1048576
write(3, "ne 533547\nline 533548\nline 53354"..., 1048576) = 1048576
[...]
Your simpler original code will work and you only need to change the blocksize for the open()
calls (I would change it for both source and destination.)
My other suggestion is to abandon csv
, but that potentially takes some risks. If you have quoted strings with commas in them you have to create the right kind of parser.
BUT -- since the field you want to modify is fairly regular and the first field, you may find it much simpler to just have a readline
/write
loop where you just replace the first field and ignore the rest.
#!/usr/bin/python
import datetime
import re
with open("/tmp/out", "w", 1024*1024) as fdout, open("/tmp/in", "r", 1024*1024) as fdin:
for i in range(0, 6):
fdin.seek(0)
for line in fdin:
if i == 0:
fdout.write(line)
continue
match = re.search(r"^(\d{8}),", line)
if match:
date = datetime.datetime.strptime(match.group(1), "%Y%m%d")
fdout.write(re.sub("^\d{8},", (date + datetime.timedelta(days=i)).strftime("%Y%m%d,"), line))
else:
if line.startswith("TIME_SK,"):
continue
raise Exception("Could not find /^\d{8},/ in '%s'" % line)
If order doesn't matter, then don't reread the file over and over:
#!/usr/bin/python
import datetime
import re
with open("/tmp/in", "r", 1024*1024) as fd, open("/tmp/out", "w", 1024*1024) as out:
for line in fd:
match = re.search("^(\d{8}),", line)
if match:
out.write(line)
date = datetime.datetime.strptime(match.group(1), "%Y%m%d")
for days in range(1, 6):
out.write(re.sub("^\d{8},", (date + datetime.timedelta(days=days)).strftime("%Y%m%d,"), line))
else:
if line.startswith("TIME_SK,"):
out.write(line)
continue
raise Exception("Could not find /^\d{8},/ in %s" % line)
I went ahead and profiled one of these with python -mcProfile
and was surprised how much time was spent in strptime
. Also try caching your strptime()
calls by using this memoized strptime()
:
_STRPTIME = {}
def strptime(s):
if s not in _STRPTIME:
_STRPTIME[s] = datetime.datetime.strptime(s, "%Y%m%d")
return _STRPTIME[s]
Upvotes: 2
Reputation: 4572
I don't have a 14GB file to try this with, so memory footprint is a concern. Someone who knows regex better than myself might have some performance tweaking suggestions.
The main concept is don't iterate through each line when avoidable. Let re
do it's magic on the whole body of text then write that body to the file.
import re
newdate = "20150201,"
f = open('sample.csv', 'r')
g = open('result.csv', 'w')
body = f.read()
## keeps the original csv
g.write(body)
# strip off the header -- we already have one.
header, mainbody = body.split('\n', 1)
# replace all the dates
newbody = re.sub(r"20150131,", newdate, mainbody)
#end of the body didn't have a newline. Adding one back in.
g.write('\n' + newbody)
f.close()
g.close()
Upvotes: 2