Jim
Jim

Reputation: 13

Loading Cassandra Data into Dask Dataframe

I am trying to load data from a cassandra database into a Dask dataframe. I have tried querying the following with no success:

query="""SELECT * FROM document_table"""
df = man.session.execute(query)
df = dd.DataFrame(list(df)) 

TypeError                                 Traceback (most recent call last)
<ipython-input-135-021507f6f2ab> in <module>()
----> 1 a = dd.DataFrame(list(df))

    TypeError: __init__() missing 3 required positional arguments: 'name', 'meta', and 'divisions'

Does anybody know an easy way to load data directly from Cassandra into Dask? It is too much memory too load into pandas first.

Upvotes: 1

Views: 812

Answers (1)

mdurant
mdurant

Reputation: 28683

Some problems with your code:

  • the line df = presumably loads the whole data-set into memory. Dask is not invoked here, it plays no part in this. Someone with knowledge of the Cassandra driver can confirm this.

  • list(df) produces a list of the column names of a dataframe and drops all the data

  • dd.DataFrame, if you read the docs is not constructed like this.

What you probably want to do is a) make a function that returns one partition of the data, b) delay this function and call with the various values of the partitions c) use dd.from_delayed to make the dask dataframe. E.g., assuming the table has a field partfield which handily has possible values 1..6 and similar number of rows for each partition:

@dask.delayed
def part(x):
    session = # construct Cassandra session
    q = "SELECT * FROM document_table WHERE partfield={}".format(x)
    df = man.session.execute(query)
    return dd.DataFrame(list(df)) 

parts = [part(x) for x in range(1, 7)]
df = dd.from_delayed(parts)

Upvotes: 1

Related Questions