Reputation: 311
My intention is to create the cqlengine.models.Model
instance in a separate process and send it via queue to the database manager.
Unfortunatelly I am getting the TypeError if the multiprocessing
queue
is used
from cassandra.cqlengine import columns
from cassandra.cqlengine.models import Model
class MyModel(Model):
__keyspace__='mykeyspace'
class Meta:
get_pk_field = 'uid'
uid = columns.UUID(partition_key = True, default=uuid.uuid4)
start = columns.TimeUUID(primary_key = True, clustering_order='desc')
interrupt = columns.List(columns.Tuple(columns.TimeUUID(),columns.TimeUUID(), columns.Text()))
stop = columns.TimeUUID()
Following code connects to the Cassandra database, creates a model instance and stores it into the database:
from cassandra.cqlengine import connection
from vo.record import MyModel
import uuid
#Queue
queue = multiprocessing.Queue()
#Connect to the database
connection.setup(('localhost',),'mykeyspace')
#Create uuid for the record
uuid = uuid.uuid1()
#Prepare the new record
wt = Workingtime(start = uuid, uid = uuid)
#Store it
wt.save()
However, if I put the model in the queue and dequeue it later, the TypeError is raised.
queue.put(wt)
item = queue.get()
item.save()
If the model property _timeout is printed out, the two distinct objects id are shown (as expected)
print wt._timeout
print item._timeout
Output:
<object object at 0x76d32508>
<object object at 0x76d32e28>
File "test.py", line 25, in <module>
item.save()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/models.py", line 731, in save
if_exists=self._if_exists).save()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1437, in save
return self.update()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1420, in update
self._delete_null_columns(delete_conditionals)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1369, in _delete_null_columns
self._execute(ds)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1334, in _execute
results = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1485, in _execute_statement
return conn.execute(s, params, timeout=timeout, connection=connection)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/connection.py", line 286, in execute
result = conn.session.execute(query, params, timeout=timeout)
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1998, in execute
return self.execute_async(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state).result()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 2038, in execute_async
future.send_request()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3365, in send_request
self._start_timer()
File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3316, in _start_timer
self._timer = self.session.cluster.connection_class.create_timer(self._time_remaining, self._on_timeout)
File "/usr/local/lib/python2.7/dist-packages/cassandra/io/asyncorereactor.py", line 289, in create_timer
timer = Timer(timeout, callback)
File "/usr/local/lib/python2.7/dist-packages/cassandra/connection.py", line 1030, in __init__
self.end = time.time() + timeout
TypeError: unsupported operand type(s) for +: 'float' and 'object'
Upvotes: 2
Views: 307
Reputation: 311
The timeout must be explicitly defined:
wt.timeout(0)
I am not able to explain why the save() does not work with multiprocessing queue.
Upvotes: 1