Reputation: 9537
What is the proper and fastest way to read Cassandra data into pandas? Now I use the following code but it's very slow...
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory
auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
auth_provider=auth_provider)
session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory
sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)
df = pd.DataFrame()
for row in session.execute(sql_query):
df = df.append(pd.DataFrame(row, index=[0]))
df = df.reset_index(drop=True).fillna(pd.np.nan)
Reading 1000 rows takes 1 minute, and I have a "bit more"... If I run the same query eg. in DBeaver, I get the whole results (~40k rows) within a minute.
Thank you!!!
Upvotes: 30
Views: 22082
Reputation: 81
Simply you can run a loop inside pandas DataFrame get job done!!
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
auth_provider=auth_provider)
session = cluster.connect(CASSANDRA_DB)
data = session.execute("SELECT * FROM <table_name>;")
df = pd.DataFrame([d for d in data])
Upvotes: 0
Reputation: 153
I used the row_factory solution for a few weeks, then hit datatype problems when trying to write the dataframe into another table with identical structure. Pandas guessed float
datatype for an int column with many empty fields. During write, the cassandra driver complained about type mismatch.
TypeError: Received an argument of invalid type for column "frequency". Expected: <class 'cassandra.cqltypes.Int32Type'>, Got: <class 'float'>; (required argument is not an integer)
Pandas int columns can't support NaN or None, so best option is probably make that column a python object.
A quick hack was tweaking pandas_factory to avoid pandas inference. Not an ideal blanket policy:
def pandas_factory(colnames, rows):
df = pd.DataFrame(rows, columns=colnames, dtype=object)
return df
I also found that I can do: df = pandas.DataFrame(result.all())
if I don't want the row factory.
As an interim solution, I'm wishing for a robust result_to_df()
function that uses result.column_types
(ex: cassandra.cqltypes.Int32Type
) and makes good guesses about translating those to python objects or numpy types. Will edit this answer if/when I get time to write that. Pandas read_cql
& to_cql
would be ideal, but probably beyond my bandwidth.
Upvotes: 1
Reputation: 1
Fastest way to read Cassandra data into pandas with automatic iteration of pages. Create dictionary and add each to it by automatically iterating all pages. Then, create dataframe with this dictionary.
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import dict_factory
auth_provider = PlainTextAuthProvider(username=CASSANDRA_USER, password=CASSANDRA_PASS)
cluster = Cluster(contact_points=[CASSANDRA_HOST], port=CASSANDRA_PORT,
auth_provider=auth_provider)
session = cluster.connect(CASSANDRA_DB)
session.row_factory = dict_factory
sql_query = "SELECT * FROM {}.{};".format(CASSANDRA_DB, CASSANDRA_TABLE)
dictionary ={"column1":[],"column2":[]}
for row in session.execute(sql_query):
dictionary["column1"].append(row.column1)
dictionary["column1"].append(row.column1)
df = pd.DataFrame(dictionary)
Upvotes: 0
Reputation: 49
I have been working on moving data from Cassandra to mssql, and used answers given here for the reference, I am able to move data but my source table in cassandra is huge and my query is getting timeout error from cassandra , the thing is we cannot increase the timeout and I am only left with the option of selecting rows in batches in my query, my code is also converting the cassandra collection data types to str as I want to insert those in mssql and then parse it, please let me know if anyone faces similar issue, the code I built is given below:
import sys
import pandas as pd
import petl as etl
import pyodbc
import sqlalchemy
from cassandra.auth import PlainTextAuthProvider
from cassandra.cluster import Cluster
from sqlalchemy import *
from cassandra.query import SimpleStatement
def pandas_factory(colnames, rows):
return pd.DataFrame(rows, columns=colnames)
engine = sqlalchemy.create_engine('sql_server_connection string')
cluster = Cluster(
contact_points=['cassandra_host'],
auth_provider = PlainTextAuthProvider(username='username', password='passwrd')
)
session = cluster.connect('keyspace',wait_for_all_pools=True)
session.row_factory = pandas_factory
request_timeout = 60000
query = "SELECT * FROM cassandratable"
statement = SimpleStatement(query, fetch_size=5000)
rows = session.execute(statement)
df = rows._current_rows
df['attributes'] = df.attributes.astype(str)
df['attributesgenerated'] = df.attributesgenerated.astype(str)
df['components'] = df.components.astype(str)
df['distributioncenterinfo'] = df.distributioncenterinfo.astype(str)
df['images'] = df.images.astype(str)
df['itemcustomerzonezoneproductids'] =
df.itemcustomerzonezoneproductids.astype(str)
df['itempodconfigids'] = df.itempodconfigids.astype(str)
df['keywords'] = df.keywords.astype(str)
df['validationmessages'] = df.validationmessages.astype(str)
df['zones'] = df.zones.astype(str)
#error_bad_lines=False
#print(df)
df.to_sql(
name='mssql_table_name',
con=engine,
index=False,
if_exists='append',
chunksize=1
)
Upvotes: 0
Reputation: 1233
What I do (in python 3) is :
query = "SELECT ..."
df = pd.DataFrame(list(session.execute(query)))
Upvotes: 17
Reputation: 9537
I got the answer at the official mailing list (it works perfectly):
Hi,
try to define your own pandas row factory:
def pandas_factory(colnames, rows): return pd.DataFrame(rows, columns=colnames) session.row_factory = pandas_factory session.default_fetch_size = None query = "SELECT ..." rslt = session.execute(query, timeout=None) df = rslt._current_rows
That's the way i do it - an it should be faster...
If you find a faster method - i'm interested in :)
Michael
Upvotes: 58