Reputation: 983
I have written a Python script that reads rows from CSV file and then inserts it into Cassandra. It runs all alright but after certain insertions, it receives a timeout error.
# lets do some batch insert
def insert_data(self):
start_time = datetime.utcnow()
destination = "/Users/aviralsrivastava/dev/learning_dask/10M_rows.csv"
chunksize = 1000
chunks = pd.read_csv(destination, chunksize=chunksize)
chunk_counter = 0
for df in chunks:
df = df.to_dict(orient='records')
chunk_counter += 1
batch = BatchStatement()
for row in df:
key = str(row["0"])
row = json.dumps(row, default=str)
insert_sql = self.session.prepare(
(
"INSERT INTO {} ({}, {}, {}) VALUES (?,?,?)"
).format(
self.table_name, "id", "version", "row"
)
)
batch.add(insert_sql, (key, "version_1", row))
self.session.execute(batch)
self.log.info("One chunk's Batch Insert Completed")
print(
str(chunk_counter*chunksize) + " : " +
str(datetime.utcnow() - start_time)
)
del batch
print("Complete task's duration is: {}".format(
datetime.utcnow() - start_time))
The code for establishing a connection is as follows:
def createsession(self):
self.cluster = Cluster(['localhost'], connect_timeout = 50)
self.session = self.cluster.connect(self.keyspace)
And the error is:
2019-01-16 15:58:49,013 [ERROR] cassandra.cluster: Error preparing query:
Traceback (most recent call last):
File "cassandra/cluster.py", line 2402, in cassandra.cluster.Session.prepare
File "cassandra/cluster.py", line 4062, in cassandra.cluster.ResponseFuture.result
cassandra.OperationTimedOut: errors={'127.0.0.1': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1
Traceback (most recent call last):
File "getting_started.py", line 107, in <module>
example1.insert_data()
File "getting_started.py", line 86, in insert_data
self.table_name, "id", "version", "row"
File "cassandra/cluster.py", line 2405, in cassandra.cluster.Session.prepare
File "cassandra/cluster.py", line 2402, in cassandra.cluster.Session.prepare
File "cassandra/cluster.py", line 4062, in cassandra.cluster.ResponseFuture.result
cassandra.OperationTimedOut: errors={'127.0.0.1': 'Client request timeout. See Session.execute[_async](timeout)'}, last_host=127.0.0.1
Upvotes: 0
Views: 1985
Reputation: 87234
By using batches you're killing your Cassandra. Batches in Cassandra are used for specific purpose, and not for submitting multiple records together (until they are all belongs to the same partition) - you can read about batch misuse in the documentation. More effective way will be to use prepared statements with asynchronous execution of queries via execute_async
: getting started section of driver documentation has examples. In this case each query will go to machine that keeps data for specific partition, and not loading the coordinator node as in case of using batches.
Another error that you have is that you're preparing query inside the loop - do this before first for
loop, and then use prepared query inside the loop. You may also need to increase the number of in-flight requests per connection to saturate network.
P.S. I answered the same question yesterday, but for Java.
Upvotes: 2