Reputation: 164
I have a Django application and I'm using Celery,SQS and S3. When I run the following function using Django, Celery and SQS the function works and it prints 'hello' every minute as it should.
from celery.task import periodic_task
from celery.schedules import crontab
@periodic_task(run_every=crontab(hour='*', minute='*', day_of_week="*"))
def print_hello():
print('hello world')
But the app is also linked to an S3 Bucket. Whenever a new file is saved to S3 a notification is sent to the SQS queue. The problem happens when a notification message is sent to the SQS queue. When a notification reaches the queue the worker fails. It stops the periodic task print_hello(), gives this error message:
[2019-11-07 22:10:57,173: CRITICAL/MainProcess] Unrecoverable error: Error('Incorrect padding') ...parserinvoker/lib64/python3.7/base64.py", line 87, in b64decode return binascii.a2b_base64(s) binascii.Error: Incorrect padding
and then quits. I've been looking at the documentation and have been trying to troubleshoot all week and haven't found a solution. I'm including my settings.py in case it's a configuration issue
Settings.py
BROKER_URL = "sqs://"
CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TASK_SERIALIZER = 'json'
CELERY_DEFAULT_QUEUE = env('CELERY_DEFAULT_QUEUE')
CELERY_RESULT_BACKEND = None
BROKER_TRANSPORT_OPTIONS = {
'region': 'us-east-1',
'polling_interval':20,
'visibility_timeout': 3600,
'task_default_queue': env('CELERY_DEFAULT_QUEUE'),
}
Upvotes: 0
Views: 1158
Reputation: 15916
The format of json payload that celery expects on the queue is different than the one that SQS receives from s3; in order to properly process these, you may want to have a separate periodic task that checks for these periodically and drains the s3 notification queue rather than sending the s3 notifications to the celery broker queue. The s3 message body will look as described in the amazon documentation here. Here is a sample 2.1 record the is sent from S3 to SQS:
"Records":[
{
"eventVersion":"2.1",
"eventSource":"aws:s3",
"awsRegion":"us-west-2",
"eventTime":The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, when Amazon S3 finished processing the request,
"eventName":"event-type",
"userIdentity":{
"principalId":"Amazon-customer-ID-of-the-user-who-caused-the-event"
},
"requestParameters":{
"sourceIPAddress":"ip-address-where-request-came-from"
},
"responseElements":{
"x-amz-request-id":"Amazon S3 generated request ID",
"x-amz-id-2":"Amazon S3 host that processed the request"
},
"s3":{
"s3SchemaVersion":"1.0",
"configurationId":"ID found in the bucket notification configuration",
"bucket":{
"name":"bucket-name",
"ownerIdentity":{
"principalId":"Amazon-customer-ID-of-the-bucket-owner"
},
"arn":"bucket-ARN"
},
"object":{
"key":"object-key",
"size":object-size,
"eTag":"object eTag",
"versionId":"object version if bucket is versioning-enabled, otherwise null",
"sequencer": "a string representation of a hexadecimal value used to determine event sequence,
only used with PUTs and DELETEs"
}
},
"glacierEventData": {
"restoreEventData": {
"lifecycleRestorationExpiryTime": "The time, in ISO-8601 format, for example, 1970-01-01T00:00:00.000Z, of Restore Expiry",
"lifecycleRestoreStorageClass": "Source storage class for restore"
}
}
}
]
}
The celery message format looks like this.
Upvotes: 1