Reputation: 1402
I have a function that adds a column to a DataFrame using a function, for eg
def myfunc(x):
resp_data = {'status': '1', 'data': x}
return json.dumps(resp_data)
The original Pandas dataframe df
is converted into Dask
DataFrame as follows
import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=30)
Now I call the function myfunc
on ddf
to add new column data_json
using existing column att
as follows
ddf['data_json'] = ddf.apply(lambda row:myfunc(row['att']),
axis=1, result_type='expand', meta=(None, 'str'))
When I call ddf.compute()
it breaks with this error
AttributeError: 'Series' object has no attribute 'columns'
I need to save ddf
to a file after calling compute()
using
ddf.to_csv("myfile.csv", index=False, single_file=True)
How can I handle the error to skip those rows where this is generated and continue to process and save the Dask dataframe?
Upvotes: 1
Views: 600
Reputation: 16551
A few suggestions:
if your function is simple, then it is not necessary to pass the series as an argument, so something like ddf.apply(myfunc, axis=1)
should work. If the function takes multiple arguments, then content of the function should specify how to handle multiple columns.
turns out json
doesn't like numpy
dtypes, so before dumping the value needs to be converted using int
.
if the dataframe is saved to csv, then there is no neeed to .compute
it before, as it will involve doing same work twice.
if myfunc
does not depend on the neighbouring rows, one could also use .map_partitions
.
import json
import dask.dataframe as dd
import pandas as pd
ddf = dd.from_pandas(pd.DataFrame(range(5), columns=["x"]), npartitions=2)
ddf["y"] = 2 * ddf["x"]
def myfunc(row):
"""content of the function should specify how to handle different columns"""
resp_data = {
"status": "1",
"y": int(row["y"]),
"diff_data": int(row["y"] - row["x"]),
}
return json.dumps(resp_data)
ddf["data_json"] = ddf.apply(myfunc, axis=1, result_type="expand", meta=(None, "str"))
print(ddf.compute())
# x y data_json
# 0 0 0 {"status": "1", "y": 0, "diff_data": 0}
# 1 1 2 {"status": "1", "y": 2, "diff_data": 1}
# 2 2 4 {"status": "1", "y": 4, "diff_data": 2}
# 3 3 6 {"status": "1", "y": 6, "diff_data": 3}
# 4 4 8 {"status": "1", "y": 8, "diff_data": 4}
# if the dataframe only needs to be saved, there is no need for separate .compute
# ddf.to_csv("myfile.csv", index=False, single_file=True)
Upvotes: 1