karelok
karelok

Reputation: 311

TypeError: unsupported operand type(s) when the model is taken off queue

My intention is to create the cqlengine.models.Model instance in a separate process and send it via queue to the database manager.

Unfortunatelly I am getting the TypeError if the multiprocessing queue is used

The model

from cassandra.cqlengine import columns 
from cassandra.cqlengine.models import Model

class MyModel(Model):
    __keyspace__='mykeyspace'

    class Meta:
        get_pk_field = 'uid'

    uid = columns.UUID(partition_key = True, default=uuid.uuid4)
    start = columns.TimeUUID(primary_key = True, clustering_order='desc')
    interrupt = columns.List(columns.Tuple(columns.TimeUUID(),columns.TimeUUID(), columns.Text()))
    stop = columns.TimeUUID()

Example

Following code connects to the Cassandra database, creates a model instance and stores it into the database:

    from cassandra.cqlengine import connection
    from vo.record import MyModel

    import uuid

    #Queue
    queue = multiprocessing.Queue()
    #Connect to the database
    connection.setup(('localhost',),'mykeyspace')

    #Create uuid for the record
    uuid = uuid.uuid1()
    #Prepare the new record
    wt = Workingtime(start = uuid, uid = uuid)

    #Store it
    wt.save()

However, if I put the model in the queue and dequeue it later, the TypeError is raised.

    queue.put(wt)
    item = queue.get()
    item.save()

If the model property _timeout is printed out, the two distinct objects id are shown (as expected)

print wt._timeout
print item._timeout

Output:

<object object at 0x76d32508>
<object object at 0x76d32e28>

Traceback (most recent call last):

  File "test.py", line 25, in <module>
    item.save() 
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/models.py", line 731, in save
    if_exists=self._if_exists).save()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1437, in save
    return self.update()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1420, in update
    self._delete_null_columns(delete_conditionals)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1369, in _delete_null_columns
    self._execute(ds)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1334, in _execute
    results = _execute_statement(self.model, statement, self._consistency, self._timeout, connection=connection)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/query.py", line 1485, in _execute_statement
    return conn.execute(s, params, timeout=timeout, connection=connection)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cqlengine/connection.py", line 286, in execute
    result = conn.session.execute(query, params, timeout=timeout)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 1998, in execute
    return self.execute_async(query, parameters, trace, custom_payload, timeout, execution_profile, paging_state).result()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 2038, in execute_async
    future.send_request()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3365, in send_request
    self._start_timer()
  File "/usr/local/lib/python2.7/dist-packages/cassandra/cluster.py", line 3316, in _start_timer
    self._timer = self.session.cluster.connection_class.create_timer(self._time_remaining, self._on_timeout)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/io/asyncorereactor.py", line 289, in create_timer
    timer = Timer(timeout, callback)
  File "/usr/local/lib/python2.7/dist-packages/cassandra/connection.py", line 1030, in __init__
    self.end = time.time() + timeout
TypeError: unsupported operand type(s) for +: 'float' and 'object'

How do I prevent the error?

Upvotes: 2

Views: 307

Answers (1)

karelok
karelok

Reputation: 311

The timeout must be explicitly defined:

wt.timeout(0)

I am not able to explain why the save() does not work with multiprocessing queue.

Upvotes: 1

Related Questions