Gimbo
Gimbo

Reputation: 41

Apply json.loads for a column of dataframe with dask

I have a dataframe fulldb_accrep_united of such kind:

   SparkID  ...                                             Period
0   913955  ...  {"@PeriodName": "2000", "@DateBegin": "2000-01...
1   913955  ...  {"@PeriodName": "1999", "@DateBegin": "1999-01...
2    16768  ...  {"@PeriodName": "2007", "@DateBegin": "2007-01...
3    16768  ...  {"@PeriodName": "2006", "@DateBegin": "2006-01...
4    16768  ...  {"@PeriodName": "2005", "@DateBegin": "2005-01...

I need to convert Period column, which is now column of strings into a column of json values. Usually I do it with df.apply(lambda x: json.loads(x)), but this dataframe is too large to process it as a whole. I want to use dask, but I seem to miss something important. I think I don't understand how to use apply in dask, but I can't find out the solution.

The codes

This is how I supposed to do it if using Pandas with all df in memory:

#%% read df
os.chdir('/opt/data/.../download finance/output')
fulldb_accrep_united = pd.read_csv('fulldb_accrep_first_download_raw_quotes_corrected.csv', index_col = 0, encoding = 'utf-8')
os.chdir('..')

#%% Deleting some freaky symbols from column
condition = fulldb_accrep_united['Period'].str.contains('\\xa0', na = False, regex = False)
fulldb_accrep_united.loc[condition.values, 'Period'] = fulldb_accrep_united.loc[condition.values, 'Period'].str.replace('\\xa0', ' ', regex = False).values

#%% Convert to json
fulldb_accrep_united.loc[fulldb_accrep_united['Period'].notnull(), 'Period'] = fulldb_accrep_united['Period'].dropna().apply(lambda x: json.loads(x))

This is the code where i try to use dask:

#%% load data with dask
os.chdir('/opt/data/.../download finance/output')
fulldb_accrep_united = dd.read_csv('fulldb_accrep_first_download_raw_quotes_corrected.csv', encoding = 'utf-8', blocksize = 16 * 1024 * 1024) #16Mb chunks
os.chdir('..')

#%% setup calculation graph. No work is done here.
def transform_to_json(df):
    condition = df['Period'].str.contains('\\xa0', na = False, regex = False)
    df['Period'] = df['Period'].mask(condition.values, df['Period'][condition.values].str.replace('\\xa0', ' ', regex = False).values)

    condition2 = df['Period'].notnull()
    df['Period'] = df['Period'].mask(condition2.values, df['Period'].dropna().apply(lambda x: json.loads(x)).values)

result = transform_to_json(fulldb_accrep_united)

The last cell here gives error:

NotImplementedError: Series getitem in only supported for other series objects with matching partition structure

What I do wrong? I tried to find similar topics for almost 5 hours, but I think I am missing something important, cause I am new to the topic.

Upvotes: 3

Views: 1679

Answers (1)

MRocklin
MRocklin

Reputation: 57271

Your question was long enough that I didn't read through all of it. My apologies. See https://stackoverflow.com/help/minimal-reproducible-example

However, based on the title, it may be that you want to apply the json.loads function across every element in a dataframe's column

df["column-name"] = df["column-name"].apply(json.loads)

Upvotes: 1

Related Questions