Reputation: 21
I've got a list of delayed dask arrays stored in dask_arr_ls
that I want to turn into a dask dataframe. Here's a skeleton of my pipeline:
def simulate_device_data(num_id):
# create data for unknown number of timestamps
data_ls = [unknown_qty*[num_id, time, lon, lat]]
device_arr = np.stack(data_ls)
device_dask_arr = da.from_array(device_arr, chunks=device_arr.size)
return device_dask_arr
dask_arr_ls = []
for i_device in range(n_devices):
i_dask_arr = delayed(simulate_device_data)(i_device)
dask_arr_ls.append(i_darr)
dask_arr_ls = [da.from_delayed(i_dask_arr, shape=(np.nan, 4), dtype=float)
for i_dask_arr in dask_arr_ls]
ddf = dd.concat([dd.from_dask_array(i_darr) for i_darr in darr_ls])
ddf.columns = ["num_id", "t", "lon", "lat"]
ddf.compute()
The compute()
produces the following error message:
ValueError: DataFrame constructor not properly called!
What am I doing wrong?
Upvotes: 0
Views: 431
Reputation: 21
I never did figure out what my error was with code above. I suspect I was misusing delayed
somehow. I modified my pipeline as follows to get it to work.
def simulate_device_data(num_id):
# create data for unknown number of timestamps
data_ls = [unknown_qty*[num_id, time, lon, lat]]
device_arr = np.stack(data_ls)
device_df = pd.DataFrame(device_arr)
return device_df
df_ls = []
for i_device in range(n_devices):
i_df = delayed(simulate_device_data)(i_device)
df_ls.append(i_df)
archetype_df = pd.DataFrame(None, columns=["name", "num_id", "t", "lon", "lat"])
archetype_df = archetype_df.astype({"name": "object", "num_id": "int64", "t": "datetime64[ns]",
"lon": "float64", "lat": "float64"},
copy=False)
ddf = dd.from_delayed(df_ls, meta=archetype_df)
ddf.compute()
Upvotes: 0