jaimeardpm83_
jaimeardpm83_

Reputation: 25

Is it more profitable to read files too large line by line or read all files in one step with pandas Dataframe, maybe?

I have run my script in an instance of 18Gb of ram, 4 CPU, and 20 Gb of a disk in both use cases

  1. My use case is (read line by line):

Read line by line and process every 500000 lines where the script to make cleaning data(add columns, convert to specific type), convert chunk data to particular file type ( parquet), and load data to s3, all done with pandas library. I want to clarify that every 500000 lines are running concurrently by child process (I am using multiprocessing).

My core script in program:

 s3_object = s3r.Object(bucket_name=cfg['BUCKETS']['INPUT_PATH'], key=s3_key_input)
    chunk = ""
    i = 0
    err = 0    
    fs = []
    columns_name_list, sep = build_custom_scheme(s3r, params, fs_config, cfg)

    if sep == "\\t": sep = "\t"
    dataset_config = {"colunms": columns_name_list, "separator": sep}

    with concurrent.futures.ProcessPoolExecutor(max_workers=10) as executor:

        for ln in s3_object.get()['Body'].iter_lines():
            try:
                i += 1
                
                chunk += str(ln, 'utf-8') + '\n'

            except Exception as e:
                err += 1
                _logger.warning("Numero de errores de encoding {0} : ".format(err))

            if i % 500000 == 0:

                _logger.info("Procesando {0} registros: ".format(i))
                fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

                del chunk
            
                chunk = ""

        _logger.info("Procesando {0} registros: ".format(i))

        fs.append(executor.submit(async_func, chunk, dataset_config, params, cfg, layer, i))

        for i, f in enumerate(concurrent.futures.as_completed(fs)):
            print(f"Process {i} - result {f}")
  1. Case when read all file in pandas (not use multiprocessing)
    df = pd.concat((tchunk for tchunk in pd.read_csv(tmp, dtype=str, na_filter=False, names = columns_name,  sep=separator, header = None, chunksize=5000,index_col=False)))

The last case is too slow when reading very large files. While in the first usecase is working with files of up to 5gb in an average time of 3 minutes. Then, should consider using just the first use case or maybe to use the second usecase with other library like dask ???

Upvotes: 0

Views: 594

Answers (1)

BeRT2me
BeRT2me

Reputation: 13242

Do you run into the same issues running the second case like this?

df = pd.DataFrame()
for chunk in pd.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, chunksize=500000, index_col=False):
    df = pd.concat([df, chunk])

It'll take some setup to implement, but you could also try pyspark.pandas, giving the benefits of pyspark, while not having to learn something new:

import pyspark.pandas as ps

df = ps.read_csv(tmp, dtype=str, na_filter=False, names=columns_name, sep=separator, header=None, index_col=False)

Upvotes: 1

Related Questions