Reputation: 13
I am trying to load data from a cassandra database into a Dask dataframe. I have tried querying the following with no success:
query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df))
TypeError Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))
TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'
Does anybody know an easy way to load data directly from Cassandra into Dask? It is too much memory too load into pandas first.
Upvotes: 1
Views: 812
Reputation: 28683
Some problems with your code:
the line df =
presumably loads the whole data-set into memory. Dask is not invoked here, it plays no part in this. Someone with knowledge of the Cassandra driver can confirm this.
list(df)
produces a list of the column names of a dataframe and drops all the data
dd.DataFrame
, if you read the docs is not constructed like this.
What you probably want to do is a) make a function that returns one partition of the data, b) delay this function and call with the various values of the partitions c) use dd.from_delayed
to make the dask dataframe. E.g., assuming the table has a field partfield
which handily has possible values 1..6 and similar number of rows for each partition:
@dask.delayed
def part(x):
session = # construct Cassandra session
q = "SELECT * FROM document_table WHERE partfield={}".format(x)
df = man.session.execute(query)
return dd.DataFrame(list(df))
parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)
Upvotes: 1