Reputation: 79
I am facing a strange issue while reading a csv file from Google cloud storage bucket and writing it to a file in different folder in the same bucket.
I have a csv file named test.csv with 1000001 lines in it. I am trying to replace " in each line with blank space and write to a file called cleansed_test.csv.
I tested my code in local and works as expected.
below is the code i am using in my local
import pandas as pd
import csv
import re
new_lines=[]
new_lines_error_less_cols=[]
new_lines_error_more_cols=[]
with open('c:\\Users\test_file.csv','r') as f:
lines = f.readlines()
print(len(lines))
for line in lines:
new_line = re.sub('["]','',line)
new_line= new_line.strip()
new_lines.append(new_line)
# elif line.count('|') < 295:
# new_line_error_less = re.sub('["]','inches',line)
# new_line_error_less= new_line_error_less.strip()
# new_lines_error_less_cols.append(new_line_error_less)
# else:
# new_line_error_more = re.sub('["]','inches',line)
# new_line_error_more= new_line_error_more.strip()
# new_lines_error_more_cols.append(new_line_error_more)
new_data = pd.DataFrame(new_lines)
print(new_data.info())
#new_data.to_csv('c:\\cleansed_file.csv',header=None,index=False,encoding='utf-8')
But when i try doing the same file in gcs bucket only 67514 rows are being read
code I am using in composer
def replace_quotes(project,bucket,**context):
import pandas as pd
import numpy as np
import csv
import os
import re
import gcsfs
import io
fs = gcsfs.GCSFileSystem(project='project_name')
updated_file_list = fs.ls('bucketname/FULL')
updated_file_list = [ x for x in updated_file_list if "filename" in x ]
new_lines=[]
new_lines_error_less_cols=[]
new_lines_error_more_cols=[]
for f in updated_file_list:
file_name = os.path.splitext(f)[0]
parse_names = file_name.split('/')
filename = parse_names[2]
bucketname = parse_names[0]
with fs.open("gs://"+f,'r') as pf:
lines = pf.readlines()
print("length of lines----->",len(lines))#even here showing 67514
for line in lines:
new_line = re.sub('["]','',line)
new_line= new_line.strip()
new_lines.append(new_line)
new_data = pd.DataFrame(new_lines)
#new_data.to_csv("gs://"+bucketname+"/ERROR_FILES/cleansed_"+filename+".csv",escapechar='',header = None,index=False,encoding='utf-8',quoting=csv.QUOTE_NONE)
Also in the bucket i see the sizes of the files test.csv and cleansed_test.csv are the same.
The only thing i can think of is since files are compressed in gcs buckets should i be opening the files in a different way. cuz when i download the files to local they are lot larger than what i see in the bucket.
Please advise.
Thanks.
Upvotes: 0
Views: 1373
Reputation: 79
for any one curious this is how to inflate a file that has extension .csv but actually is compressed with gzip. gsutil cat gs://BUCKET/File_Name.csv | zcat | gsutil cp - gs://BUCKET/Newfile.csv
The only issue here i see is i cant use wild cards or i should say to put it plainly we have to give the destination file name
the down side is since i have to specify the destination file name i cannot use it in bash operator in airflow(this is what i thik i may be wrong)
thanks
any ways hope this helps
Upvotes: 1
Reputation: 132
I think you can achieve what you want by using the replace method of the dataframe column object, and specifying the bool true parameter (otherwise field string must perfectly match the condition of matching character). In this way you can simply iterate per each column and replace the unwanted string, rewriting each column with the newly modified one afterwards.
I modified a bit your code and ran it on my VM in GCP. As you can see I preferred to use the Pandas.read_csv() method as the GCSF one was throwing me some errors. The code seems doing its job as I initially tested by replacing a dummy common string and it worked smoothly.
This is your modified code. Please also note I refactored a bit the reading part as did not properly match the path of the csv in my bucket.
from pandas.api.types import is_string_dtype
import pandas as pd
import numpy as np
import csv
import os
import re
import gcsfs
import io
fs = gcsfs.GCSFileSystem(project='my-project')
updated_file_list = fs.ls('test-bucket/')
updated_file_list = [ x for x in updated_file_list if "simple.csv" in x ]
new_lines=[]
new_lines_error_less_cols=[]
new_lines_error_more_cols=[]
for f in updated_file_list:
file_name = os.path.splitext(f)[0]
print(f, file_name)
parse_names = file_name.split('/')
filename = parse_names[1]
bucketname = parse_names[0]
with fs.open("gs://"+f) as pf:
df = pd.read_csv(pf)
#print(df.head(len(df))) #To check results
for col in df:
if is_string_dtype(df[col]):
df[col] = df[col].replace(to_replace=['"'], value= '', regex= True)
#print(df.head(len(df))) #To check results
new_data = pd.DataFrame(df)
#new_data.to_csv("gs://"+bucketname+"/ERROR_FILES/cleansed_"+filename+".csv",escapechar='',header = None,index=False,encoding='utf-8',quoting=csv.QUOTE_NONE
Hope my efforts solved you issue...
Upvotes: 1