Reputation: 1624
I am trying to use Azure Service Bus as the broker for my celery app.
I have patched the solution by referring to various sources. The goal is to use Azure Service Bus as the broker and PostgresSQL as the backend.
I created an Azure Service Bus and copied the credentials for the RootManageSharedAccessKey
to the celery app.
Following is the task.py
from time import sleep
from celery import Celery
from kombu.utils.url import safequote
SAS_policy = safequote("RootManageSharedAccessKey") #SAS Policy
SAS_key = safequote("1234222zUY28tRUtp+A2YoHmDYcABCD") #Primary key from the previous SS
namespace = safequote("bluenode-dev")
app = Celery('tasks', backend='db+postgresql://afsan.gujarati:admin@localhost/local_dev',
broker=f'azureservicebus://{SAS_policy}:{SAS_key}=@{namespace}')
@app.task
def divide(x, y):
sleep(30)
return x/y
When I try to run the Celery app using the following command:
celery -A tasks worker --loglevel=INFO
I get the following error
[2020-10-09 14:00:32,035: CRITICAL/MainProcess] Unrecoverable error: AzureHttpError('Unauthorized\n<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>')
Traceback (most recent call last):
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 918, in create_channel
return self._avail_channels.pop()
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1225, in _perform_request
resp = self._filter(request)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_http/httpclient.py", line 211, in perform_request
raise HTTPError(status, message, respheaders, respbody)
azure.servicebus.control_client._http.HTTPError: Unauthorized
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 311, in start
blueprint.start(self)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 21, in start
c.connection = c.connect()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 398, in connect
conn = self.connection_for_read(heartbeat=self.amqheartbeat)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 404, in connection_for_read
return self.ensure_connected(
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 430, in ensure_connected
conn = conn.ensure_connection(
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 383, in ensure_connection
self._ensure_connection(*args, **kwargs)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 435, in _ensure_connection
return retry_over_time(
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
return fun(*args, **kwargs)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 866, in _connection_factory
self._connection = self._establish_connection()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 801, in _establish_connection
conn = self.transport.establish_connection()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 938, in establish_connection
self._avail_channels.append(self.create_channel(self))
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 920, in create_channel
channel = self.Channel(connection)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/azureservicebus.py", line 64, in __init__
for queue in self.queue_service.list_queues():
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 313, in list_queues
response = self._perform_request(request)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1227, in _perform_request
return _service_bus_error_handler(ex)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_serialization.py", line 569, in _service_bus_error_handler
return _general_error_handler(http_error)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_common_error.py", line 41, in _general_error_handler
raise AzureHttpError(message, http_error.status)
azure.common.AzureHttpError: Unauthorized
<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>
I don't see a straight solution for this anywhere. What am I missing?
P.S. I did not create the Queue in Azure Service Bus. I am assuming that celery would create the Queue by itself when the celery app is executed.
P.S.S. I also tried to use the exact same credentials in Python's Service Bus Client and it seemed to work. It feels like a Celery issue, but I am not able to figure out exactly what.
Upvotes: 0
Views: 3277
Reputation: 23141
If you want to use Azure Service Bus Transport to connect Azure service bus, the URL should be azureservicebus://{SAS policy name}:{SAS key}@{Service Bus Namespace}
.
For example
RootManageSharedAccessKey
from celery import Celery
from kombu.utils.url import safequote
SAS_policy = "RootManageSharedAccessKey" # SAS Policy
# Primary key from the previous SS
SAS_key = safequote("X/*****qyY=")
namespace = "bowman1012"
app = Celery('tasks', backend='db+postgresql://<>@localhost/<>',
broker=f'azureservicebus://{SAS_policy}:{SAS_key}@{namespace}')
@app.task
def add(x, y):
return x + y
Upvotes: 6