S_S
S_S

Reputation: 1402

Dask compute on dataframe to add column returns AttributeError

I have a function that adds a column to a DataFrame using a function, for eg

    def myfunc(x):
        resp_data = {'status': '1', 'data': x}
        return json.dumps(resp_data)

The original Pandas dataframe df is converted into Dask DataFrame as follows

import dask.dataframe as dd
ddf = dd.from_pandas(df, npartitions=30)

Now I call the function myfunc on ddf to add new column data_json using existing column att as follows

ddf['data_json'] = ddf.apply(lambda row:myfunc(row['att']), 
                   axis=1, result_type='expand', meta=(None, 'str'))

When I call ddf.compute() it breaks with this error

AttributeError: 'Series' object has no attribute 'columns'

I need to save ddf to a file after calling compute() using

ddf.to_csv("myfile.csv",  index=False, single_file=True)

How can I handle the error to skip those rows where this is generated and continue to process and save the Dask dataframe?

Upvotes: 1

Views: 600

Answers (1)

SultanOrazbayev
SultanOrazbayev

Reputation: 16551

A few suggestions:

  • if your function is simple, then it is not necessary to pass the series as an argument, so something like ddf.apply(myfunc, axis=1) should work. If the function takes multiple arguments, then content of the function should specify how to handle multiple columns.

  • turns out json doesn't like numpy dtypes, so before dumping the value needs to be converted using int.

  • if the dataframe is saved to csv, then there is no neeed to .compute it before, as it will involve doing same work twice.

  • if myfunc does not depend on the neighbouring rows, one could also use .map_partitions.

import json

import dask.dataframe as dd
import pandas as pd

ddf = dd.from_pandas(pd.DataFrame(range(5), columns=["x"]), npartitions=2)
ddf["y"] = 2 * ddf["x"]


def myfunc(row):
    """content of the function should specify how to handle different columns"""
    resp_data = {
        "status": "1",
        "y": int(row["y"]),
        "diff_data": int(row["y"] - row["x"]),
    }
    return json.dumps(resp_data)


ddf["data_json"] = ddf.apply(myfunc, axis=1, result_type="expand", meta=(None, "str"))

print(ddf.compute())
#    x  y                                data_json
# 0  0  0  {"status": "1", "y": 0, "diff_data": 0}
# 1  1  2  {"status": "1", "y": 2, "diff_data": 1}
# 2  2  4  {"status": "1", "y": 4, "diff_data": 2}
# 3  3  6  {"status": "1", "y": 6, "diff_data": 3}
# 4  4  8  {"status": "1", "y": 8, "diff_data": 4}

# if the dataframe only needs to be saved, there is no need for separate .compute
# ddf.to_csv("myfile.csv", index=False, single_file=True)

Upvotes: 1

Related Questions