Reputation: 25
I have run my script in an instance of 18Gb of ram, 4 CPU, and 20 Gb of a disk in both use cases
Read line by line and process every 500000 lines where the script to make cleaning data(add columns, convert to specific type), convert chunk data to particular file type ( parquet), and load data to s3, all done with pandas library. I want to clarify that every 500000 lines are running concurrently by child process (I am using multiprocessing).
My core script in program:
s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
chunk = ""
i = 0
err = 0
fs = []
columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)
if sep == "\\t": sep = "\t"
dataset_config = {"colunms": columns_name_list, "separator": sep}
with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:
for ln in s3_object.get()['Body'].iter_lines():
try:
i += 1
chunk += str(ln, 'utf-8') + '\n'
except Exception as e:
err += 1
_logger.warning("Numero de errores de encoding {0} : ".format(err))
if i % 500000 == 0:
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
del chunk
chunk = ""
_logger.info("Procesando {0} registros: ".format(i))
fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))
for i, f in enumerate(concurrent.futures.as_completed(fs)):
print(f"Process {i} - result {f}")
df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name, sep=separator, header = None, chunksize=5000,index_col=False)))
The last case is too slow when reading very large files. While in the first usecase is working with files of up to 5gb in an average time of 3 minutes. Then, should consider using just the first use case or maybe to use the second usecase with other library like dask ???
Upvotes: 0
Views: 594
Reputation: 13242
Do you run into the same issues running the second case like this?
df = pd.DataFrame()
for chunk in pd.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, chunksize=500000, index_col=False):
df = pd.concat([df, chunk])
It'll take some setup to implement, but you could also try pyspark.pandas
, giving the benefits of pyspark
, while not having to learn something new:
import pyspark.pandas as ps
df = ps.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, index_col=False)
Upvotes: 1