Reputation: 737
I'm trying to load and delete data from Cassandra using the python driver. I have tried this both using cassandra running in a docker container and again locally after the docker version was giving me problems. Here's an example of what I'm doing:
class Controller(object):
def __init__(self):
self.cluster = Cluster()
self.session = self.cluster.connect('mykeyspace')
def insert_into_cassandra(self):
query = ('INSERT INTO mytable (mykey, indexed_key) VALUES (?, ?)')
prepared = self.session.prepare(query)
prepared.consistency_level = ConsistencyLevel.QUORUM
params_gen = self.params_generator(fname)
execute_concurrent_with_args(self.session, prepared, self.parameter_generator(), concurrency=50)
def delete_param_gen(self, results):
for r in results:
yield [r.mykey]
def delete_by_index(self, value):
query = "SELECT mykey from mytable where indexed_key = '%s'" % value
res = self.session.execute(query)
delete_query = "DELETE from mytable where mykey = ?"
prepared = self.session.prepare(delete_query)
prepared.consistency_level = ConsistencyLevel.QUORUM
params_gen = self.delete_param_gen(res)
execute_concurrent_with_args(self.session, prepared, params_gen, concurrency=50)
Nothing crazy. When loading/deleting data, I frequently see the following messages:
Sending options message heartbeat on idle connection (4422117360) 127.0.0.1
Heartbeat failed for connection (4422117360) to 127.0.0.1
Here are some logs from deleting data.
[2017-02-28 08:37:20,562] [DEBUG] [cassandra.connection] Defuncting connection (4422117360) to 127.0.0.1: errors=Connection heartbeat timeout after 30 seconds, last_host=127.0.0.1
[2017-02-28 08:37:20,563] [DEBUG] [cassandra.io.libevreactor] Closing connection (4422117360) to 127.0.0.1
[2017-02-28 08:37:20,563] [DEBUG] [cassandra.io.libevreactor] Closed socket to 127.0.0.1
[2017-02-28 08:37:20,564] [DEBUG] [cassandra.pool] Defunct or closed connection (4422117360) returned to pool, potentially marking host 127.0.0.1 as down
[2017-02-28 08:37:20,566] [DEBUG] [cassandra.pool] Replacing connection (4422117360) to 127.0.0.1
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.connection] Defuncting connection (4426057600) to 127.0.0.1: errors=Connection heartbeat timeout after 30 seconds, last_host=127.0.0.1
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.io.libevreactor] Closing connection (4426057600) to 127.0.0.1
[2017-02-28 08:37:20,567] [DEBUG] [cassandra.io.libevreacto[2017-02-28 08:37:20,568] [ERROR] [cassandra.cluster] Unexpected exception while handling result in ResponseFuture:
Traceback (most recent call last):
File "cassandra/cluster.py", line 3536, in cassandra.cluster.ResponseFuture._set_result (cassandra/cluster.c:67556)
File "cassandra/cluster.py", line 3711, in cassandra.cluster.ResponseFuture._set_final_result (cassandra/cluster.c:71769)
File "cassandra/concurrent.py", line 154, in cassandra.concurrent._ConcurrentExecutor._on_success (cassandra/concurrent.c:3357)
File "cassandra/concurrent.py", line 203, in cassandra.concurrent.ConcurrentExecutorListResults._put_result (cassandra/concurrent.c:5539)
File "cassandra/concurrent.py", line 209, in cassandra.concurrent.ConcurrentExecutorListResults._put_result (cassandra/concurrent.c:5427)
File "cassandra/concurrent.py", line 123, in cassandra.concurrent._ConcurrentExecutor._execute_next (cassandra/concurrent.c:2369)
File "load_cassandra.py", line 148, in delete_param_gen
for r in rows:
File "cassandra/cluster.py", line 3991, in cassandra.cluster.ResultSet.next (cassandra/cluster.c:76025)
File "cassandra/cluster.py", line 4006, in cassandra.cluster.ResultSet.fetch_next_page (cassandra/cluster.c:76193)
File "cassandra/cluster.py", line 3781, in cassandra.cluster.ResponseFuture.result (cassandra/cluster.c:73073)
cassandra.cluster.NoHostAvailable: ('Unable to complete the operation against any hosts', {})r] Closed socket to 127.0.0.1
And here are some from inserting data:
[2017-02-28 16:50:25,594] [DEBUG] [cassandra.connection] Sending options message heartbeat on idle connection (140301574604448) 127.0.0.1
[2017-02-28 16:50:25,595] [DEBUG] [cassandra.cluster] [control connection] Attempting to reconnect
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.cluster] [control connection] Opening new connection to 127.0.0.1
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Not sending options message for new connection(140301347717016) to 127.0.0.1 because compression is disabled and a cql version was not specified
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Sending StartupMessage on <AsyncoreConnection(140301347717016) 127.0.0.1:9042>
[2017-02-28 16:50:25,596] [DEBUG] [cassandra.connection] Sent StartupMessage on <AsyncoreConnection(140301347717016) 127.0.0.1:9042>
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.io.asyncorereactor] Closing connection (140301347717016) to 127.0.0.1
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.io.asyncorereactor] Closed socket to 127.0.0.1
[2017-02-28 16:50:30,596] [DEBUG] [cassandra.connection] Connection to 127.0.0.1 was closed during the startup handshake
[2017-02-28 16:50:30,597] [WARNING] [cassandra.cluster] [control connection] Error connecting to 127.0.0.1:
Traceback (most recent call last):
File "cassandra/cluster.py", line 2623, in cassandra.cluster.ControlConnection._reconnect_internal (cassandra/cluster.c:47899)
File "cassandra/cluster.py", line 2645, in cassandra.cluster.ControlConnection._try_connect (cassandra/cluster.c:48416)
File "cassandra/cluster.py", line 1119, in cassandra.cluster.Cluster.connection_factory (cassandra/cluster.c:15085)
File "cassandra/connection.py", line 333, in cassandra.connection.Connection.factory (cassandra/connection.c:5790)
cassandra.OperationTimedOut: errors=Timed out creating connection (5 seconds), last_host=None
[2017-02-28 16:50:39,309] [ERROR] [root] Exception inserting data into cassandra
Traceback (most recent call last):
File "load_cassandra.py", line 54, in run
controller.insert_into_cassandra(filename)
File "extract_to_cassandra.py", line 141, in insert_into_cassandra
for success, result in results:
File "cassandra/concurrent.py", line 177, in _results (cassandra/concurrent.c:4856)
File "cassandra/concurrent.py", line 186, in cassandra.concurrent.ConcurrentExecutorGenResults._results (cassandra/concurrent.c:4622)
File "cassandra/concurrent.py", line 165, in cassandra.concurrent._ConcurrentExecutor._raise (cassandra/concurrent.c:3745)
cassandra.WriteTimeout: Error from server: code=1100 [Coordinator node timed out waiting for replica nodes' responses] message="Operation timed out - received only 0 responses." info={'consistency': 'QUORUM', 'required_responses': 1, 'received_responses': 0}
[2017-02-28 16:50:39,465] [DEBUG] [cassandra.connection] Received options response on connection (140301574604448) from 127.0.0.1
[2017-02-28 16:50:39,466] [DEBUG] [cassandra.cluster] Shutting down Cluster Scheduler
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.cluster] Shutting down control connection
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.io.asyncorereactor] Closing connection (140301574604448) to 127.0.0.1
[2017-02-28 16:50:39,467] [DEBUG] [cassandra.io.asyncorereactor] Closed socket to 127.0.0.1
[2017-02-28 16:50:39,468] [DEBUG] [cassandra.pool] Defunct or closed connection (140301574604448) returned to pool, potentially marking host 127.0.0.1 as down
I tweaked with consistency and even set it to 1, but that didn't work. Inserts tend to work better when running cassandra locally as opposed to docker, but they still timeout. Deletes usually work for a couple of seconds and then hang/timeout.
edit: Here's are the logs from cassandra when things fail:
INFO 18:39:11 MUTATION messages were dropped in last 5000 ms: 4 for internal timeout and 0 for cross node timeout. Mean internal dropped latency: 2933809 ms and Mean cross-node dropped latency: 0 msINFO 18:39:11 Pool Name Active Pending Completed Blocked All Time Blocked [48/1513]
INFO 18:39:11 MutationStage 32 15 470 0 0
INFO 18:39:11 ViewMutationStage 0 0 0 0 0
INFO 18:39:11 ReadStage 0 0 59 0 0
INFO 18:39:11 RequestResponseStage 0 0 0 0 0
INFO 18:39:11 ReadRepairStage 0 0 0 0 0
INFO 18:39:11 CounterMutationStage 0 0 0 0 0
INFO 18:39:11 MiscStage 0 0 0 0 0
INFO 18:39:11 CompactionExecutor 0 0 6399 0 0
INFO 18:39:11 MemtableReclaimMemory 0 0 36 0 0
INFO 18:39:11 PendingRangeCalculator 0 0 1 0 0
INFO 18:39:11 GossipStage 0 0 0 0 0
INFO 18:39:11 SecondaryIndexManagement 0 0 0 0 0
INFO 18:39:11 HintsDispatcher 0 0 0 0 0
INFO 18:39:11 MigrationStage 0 0 2 0 0
INFO 18:39:11 MemtablePostFlush 0 0 62 0 0
INFO 18:39:11 PerDiskMemtableFlushWriter_0 0 0 36 0 0
INFO 18:39:11 ValidationExecutor 0 0 0 0 0
INFO 18:39:11 Sampler 0 0 0 0 0
INFO 18:39:11 MemtableFlushWriter 0 0 36 0 0
INFO 18:39:11 InternalResponseStage 0 0 0 0 0
INFO 18:39:11 AntiEntropyStage 0 0 0 0 0
INFO 18:39:11 CacheCleanupExecutor 0 0 0 0 0
INFO 18:39:11 Native-Transport-Requests 33 0 727 0 0
INFO 18:39:11 CompactionManager 0 0INFO 18:39:11 MessagingService n/a 0/0
INFO 18:39:11 Cache Type Size Capacity KeysToSave
INFO 18:39:11 KeyCache 1368 51380224 all
INFO 18:39:11 RowCache 0 0 all
INFO 18:39:11 Table Memtable ops,data
INFO 18:39:11 system_distributed.parent_repair_history 0,0
INFO 18:39:11 system_distributed.repair_history 0,0
INFO 18:39:11 system_distributed.view_build_status 0,0
INFO 18:39:11 system.compaction_history 1,231
INFO 18:39:11 system.hints 0,0
INFO 18:39:11 system.schema_aggregates 0,0
INFO 18:39:11 system.IndexInfo 0,0
INFO 18:39:11 system.schema_columnfamilies 0,0
INFO 18:39:11 system.schema_triggers 0,0
INFO 18:39:11 system.size_estimates 40,1255
INFO 18:39:11 system.schema_functions 0,0
INFO 18:39:11 system.paxos 0,0
INFO 18:39:11 system.views_builds_in_progress 0,0
INFO 18:39:11 system.built_views 0,0
INFO 18:39:11 system.peer_events 0,0
INFO 18:39:11 system.range_xfers 0,0
INFO 18:39:11 system.peers 0,0
INFO 18:39:11 system.batches 0,0
INFO 18:39:11 system.schema_keyspaces 0,0
INFO 18:39:11 system.schema_usertypes 0,0
INFO 18:39:11 system.local 0,0
INFO 18:39:11 system.sstable_activity 6,117
INFO 18:39:11 system.available_ranges 0,0
INFO 18:39:11 system.batchlog 0,0
INFO 18:39:11 system.schema_columns 0,0
INFO 18:39:11 system_schema.columns 0,0
INFO 18:39:11 system_schema.types 0,0
INFO 18:39:11 system_schema.indexes 0,0
INFO 18:39:11 system_schema.keyspaces 0,0
INFO 18:39:11 system_schema.dropped_columns 0,0
INFO 18:39:11 system_schema.aggregates 0,0
INFO 18:39:11 system_schema.triggers 0,0
INFO 18:39:11 system_schema.tables 0,0
INFO 18:39:11 system_schema.views 0,0
INFO 18:39:11 system_schema.functions 0,0
INFO 18:39:11 system_auth.roles 0,0
INFO 18:39:11 system_auth.role_members 0,0
INFO 18:39:11 system_auth.resource_role_permissons_index 0,0
INFO 18:39:11 system_auth.role_permissions 0,0
INFO 18:39:11 mykeyspace.mytable 430,27163514
INFO 18:39:11 system_traces.sessions 0,0
INFO 18:39:11 system_traces.events 0,0
INFO 18:39:13 ParNew GC in 261ms. CMS Old Gen: 46106544 -> 74868512; Par Eden Space: 208895224 -> 0; Par Survivor Space: 16012448 -> 26083328
I see messages like this too:
Out of 29 commit log syncs over the past 248s with average duration of 1596.14ms, 1 have exceeded the configured commit interval by an average of 18231.00ms
Upvotes: 4
Views: 4600
Reputation: 57798
One thing you could try, is to reduce the idle_heartbeat_interval
setting in your connection. By default it is 30 seconds, but you can configure that when instancing your Cluster class. In this example, I'll set it to 10 seconds:
def __init__(self):
self.cluster = Cluster(idle_heartbeat_interval=10)
self.session = self.cluster.connect('mykeyspace')
If that doesn't help, then it might be time to check your data model for anti-patterns.
Upvotes: 2