user1527491
user1527491

Reputation: 965

Using async/await syntax with Twisted callbacks

I'd like to use async/await syntax with Twisted Deferred.addCallback method. But as stated in the documentation, addCallback callback is called synchronously. I've seen inlineCallbacks decorator used for this purpose, but I'd prefer using async/await syntax (if it's even possible, or meaningful).

I picked up the original code from pika documentation, but I had no luck trying to migrate it to async/await syntax:

import pika
from pika import exceptions
from pika.adapters import twisted_connection
from twisted.internet import defer, reactor, protocol, task


async def run_async(connection):
    channel = await connection.channel()
    exchange = await channel.exchange_declare(exchange='topic_link',type='topic')
    queue = await channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)
    await channel.queue_bind(exchange='topic_link', queue='hello', routing_key='hello.world')
    await channel.basic_qos(prefetch_count=1)
    queue_object, consumer_tag = await channel.basic_consume(queue='hello', no_ack=False)
    l = task.LoopingCall(read_async, queue_object)
    l.start(0.01)


async def read_async(queue_object):
    ch,method,properties,body = await queue_object.get()
    if body:
        print(body)
    await ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('rabbitmq', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run_async)
reactor.run()

This obviously does not work because nobody awaits run_async function.

Upvotes: 3

Views: 4167

Answers (1)

user1527491
user1527491

Reputation: 965

As pointed out by notorious.no and by Twisted documentation, ensureDeferred is the way to go. Still, you have to wrap the callback result and not the callback itself which wasn't clear to me.

This is how it looks eventually:

def ensure_deferred(f):
    @functools.wraps(f)
    def wrapper(*args, **kwargs):
        result = f(*args, **kwargs)
        return defer.ensureDeferred(result)
    return wrapper


@ensure_deferred
async def run(connection):
    channel = await connection.channel()
    exchange = await channel.exchange_declare(exchange='topic_link', type='topic')
    queue = await channel.queue_declare(queue='hello', auto_delete=False, exclusive=False)
    await channel.queue_bind(exchange='topic_link', queue='hello', routing_key='hello.world')
    await channel.basic_qos(prefetch_count=1)
    queue_object, consumer_tag = await channel.basic_consume(queue='hello', no_ack=False)
    l = task.LoopingCall(read, queue_object)
    l.start(0.01)


@ensure_deferred
async def read(queue_object):
    ch, method, properties, body = await queue_object.get()
    if body:
        print(body)
    await ch.basic_ack(delivery_tag=method.delivery_tag)


parameters = pika.ConnectionParameters()
cc = protocol.ClientCreator(reactor, twisted_connection.TwistedProtocolConnection, parameters)
d = cc.connectTCP('rabbitmq', 5672)
d.addCallback(lambda protocol: protocol.ready)
d.addCallback(run)
reactor.run()

Thanks.

Upvotes: 9

Related Questions