ragesz
ragesz

Reputation: 9537

Python read Cassandra data into pandas

What is the proper and fastest way to read Cassandra data into pandas? Now I use the following code but it's very slow...

import pandas as pd

from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

df = pd.DataFrame()

for row in session.execute(sql_query):
    df = df.append(pd.DataFrame(row, index=[0]))

df = df.reset_index(drop=True).fillna(pd.np.nan)

Reading 1000 rows takes 1 minute, and I have a "bit more"... If I run the same query eg. in DBeaver, I get the whole results (~40k rows) within a minute.

Thank you!!!

Upvotes: 30

Views: 22082

Answers (6)

Lakshman
Lakshman

Reputation: 81

Simply you can run a loop inside pandas DataFrame get job done!!

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
        auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
data = session.execute("SELECT * FROM <table_name>;")

df = pd.DataFrame([d for d in data])

Upvotes: 0

JosiahJohnston
JosiahJohnston

Reputation: 153

I used the row_factory solution for a few weeks, then hit datatype problems when trying to write the dataframe into another table with identical structure. Pandas guessed float datatype for an int column with many empty fields. During write, the cassandra driver complained about type mismatch.

TypeError: Received an argument of invalid type for column "frequency". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'float'>; (required argument is not an integer)

Pandas int columns can't support NaN or None, so best option is probably make that column a python object.

A quick hack was tweaking pandas_factory to avoid pandas inference. Not an ideal blanket policy:

def pandas_factory(colnames, rows):
    df = pd.DataFrame(rows, columns=colnames, dtype=object)
    return df

I also found that I can do: df = pandas.DataFrame(result.all()) if I don't want the row factory.

As an interim solution, I'm wishing for a robust result_to_df() function that uses result.column_types (ex: cassandra.cqltypes.Int32Type) and makes good guesses about translating those to python objects or numpy types. Will edit this answer if/when I get time to write that. Pandas read_cql & to_cql would be ideal, but probably beyond my bandwidth.

Upvotes: 1

KRISHNA
KRISHNA

Reputation: 1

Fastest way to read Cassandra data into pandas with automatic iteration of pages. Create dictionary and add each to it by automatically iterating all pages. Then, create dataframe with this dictionary.

import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory

auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
    auth_provider=auth_provider)

session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory

sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)

dictionary ={"column1":[],"column2":[]}

for row in session.execute(sql_query):
    dictionary["column1"].append(row.column1)
    dictionary["column1"].append(row.column1)

df = pd.DataFrame(dictionary)

Upvotes: 0

Alok Garg
Alok Garg

Reputation: 49

I have been working on moving data from Cassandra to mssql, and used answers given here for the reference, I am able to move data but my source table in cassandra is huge and my query is getting timeout error from cassandra , the thing is we cannot increase the timeout and I am only left with the option of selecting rows in batches in my query, my code is also converting the cassandra collection data types to str as I want to insert those in mssql and then parse it, please let me know if anyone faces similar issue, the code I built is given below:

import sys
import pandas as pd
import petl as etl
import pyodbc
import sqlalchemy
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from sqlalchemy import *
from cassandra.query import SimpleStatement


def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)
    engine = sqlalchemy.create_engine('sql_server_connection string')

cluster = Cluster(
    contact_points=['cassandra_host'], 
    auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
)

session = cluster.connect('keyspace',wait_for_all_pools=True)

session.row_factory = pandas_factory
request_timeout = 60000
query = "SELECT * FROM cassandratable"
statement = SimpleStatement(query, fetch_size=5000) 
rows = session.execute(statement)

df = rows._current_rows
df['attributes'] = df.attributes.astype(str)
df['attributesgenerated'] = df.attributesgenerated.astype(str)
df['components'] = df.components.astype(str)
df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
df['images'] = df.images.astype(str)
df['itemcustomerzonezoneproductids'] = 
df.itemcustomerzonezoneproductids.astype(str)
df['itempodconfigids'] = df.itempodconfigids.astype(str)
df['keywords'] = df.keywords.astype(str)
df['validationmessages'] = df.validationmessages.astype(str)
df['zones'] = df.zones.astype(str)
#error_bad_lines=False
#print(df)
df.to_sql(
           name='mssql_table_name',
           con=engine,
           index=False,
           if_exists='append',
           chunksize=1
         )

Upvotes: 0

George C
George C

Reputation: 1233

What I do (in python 3) is :

query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))

Upvotes: 17

ragesz
ragesz

Reputation: 9537

I got the answer at the official mailing list (it works perfectly):

Hi,

try to define your own pandas row factory:

def pandas_factory(colnames, rows):
    return pd.DataFrame(rows, columns=colnames)

session.row_factory = pandas_factory
session.default_fetch_size = None

query = "SELECT ..."
rslt = session.execute(query, timeout=None)
df = rslt._current_rows

That's the way i do it - an it should be faster...

If you find a faster method - i'm interested in :)

Michael

Upvotes: 58

Related Questions