Reputation: 13401
I have a dataframe as below:
df
date time open high low last
01-01-2017 11:00:00 37 45 36 42
01-01-2017 11:23:00 36 43 33 38
01-01-2017 12:00:00 45 55 35 43
....
I want to write it into cassandra. It's kind of bulk upload after processing on data in python.
The schema for cassandra is as below:
CREATE TABLE ks.table1(date text, time text, open float, high float, low
float, last float, PRIMARY KEY(date, time))
To insert single row into cassandra we can use cassandra-driver in python but I couldn't find any details about uploading an entire dataframe.
from cassandra.cluster import Cluster
session.execute(
"""
INSERT INTO ks.table1 (date,time,open,high,low,last)
VALUES (01-01-2017, 11:00:00, 37, 45, 36, 42)
""")
P.S: The similar question have been asked earlier, but doesn't have answer to my question.
Upvotes: 3
Views: 8258
Reputation: 89
The DataFrame doesn't seem to work as expected in the first answer anymore. Here is how I access the data in the df (the example below uses cassandra db):
columns = list(df.columns.values)
query = "INSERT INTO {} ({}) VALUES({})".format(table, ','.join(columns), ','.join([val.replace(val, "?") for val in columns]))
preparedquery = self.session.prepare(query)
for row in df.loc:
values = [row[col] for col in columns]
self.session.execute(preparedquery, values)
There is an "not in range" error at the last row which I think it is pandas' bug.
Upvotes: 0
Reputation: 1562
Nice option is to use batches. First you can split df into even partitions (thanks to Python/Pandas - partitioning a pandas DataFrame in 10 disjoint, equally-sized subsets) and then put each partition as batch into Cassandra. Batch size is limited by Cassandra (cassandra.yaml) setting:
batch_size_fail_threshold_in_kb: 50
The code for batch insert of Pandas df:
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import BatchStatement
CASSANDRA_PARTITION_NUM = 1500
def write_to_cassandra(df):
cassandra_cluster = Cluster('ip')
session = cassandra_cluster.connect('keyspace')
prepared_query = session.prepare('INSERT INTO users(id, name) VALUES (?,?)')
for partition in split_to_partitions(df, CASSANDRA_PARTITION_NUM):
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)
for index, item in partition.iterrows():
batch.add(prepared_query, (item.id, item.name))
session.execute(batch)
def split_to_partitions(self, df, partition_number):
permuted_indices = np.random.permutation(len(df))
partitions = []
for i in range(partition_number):
partitions.append(df.iloc[permuted_indices[i::partition_number]])
return partitions
Update: Do it only when batch is within the same partition.
Upvotes: 0
Reputation: 86
Even i was facing this problem but i figured out that even while uploading Millions of rows(19 Million to be exact) into Cassandra its didn't take much time.
Coming to your problem,you can use cassandra Bulk LOADER to get your job done.
EDIT 1:
You can use prepared statements to help uplaod data into cassandra table while iterating through the dataFrame.
from cassandra.cluster import Cluster
cluster = Cluster(ip_address)
session = cluster.connect(keyspace_name)
query = "INSERT INTO data(date,time,open,high,low,last) VALUES (?,?,?,?,?,?)"
prepared = session.prepare(query)
"?" is used to input variables
for item in dataFrame:
session.execute(prepared, (item.date_value,item.time_value,item.open_value,item.high_value,item.low_value,item.last_value))
for item in dataFrame:
session.execute(prepared, (item[0],item[1],item[2],item[3],item[4],item[5]))
What i mean is that use for loop to extract data and upload using session.execute().
for more info on prepared statements
Hope this helps..
Upvotes: 6