Matt Dell
Matt Dell

Reputation: 9541

How can I automatically move messages off DLQ in Amazon SQS?

How can I automatically move messages from a dead letter queue back to the original queue in Amazon SQS?

Upvotes: 145

Views: 119698

Answers (17)

Dániel Berki
Dániel Berki

Reputation: 1

If someone needs an even more complex redrive, because the messages also need to be corrected/transformed before re-queue, I recommend Message Silo: https://github.com/MessageSilo/MessageSilo

Upvotes: 0

y2k-shubham
y2k-shubham

Reputation: 11627

While DLQ redrive via AWS Console has been available since Dec 2021, in June 2023 AWS announced support for programmatic redrive via AWS SDK or CLI


Quoting the article above

To programmatically automate dead-letter queue message redrive workflows, customers can now use the following actions:

  1. StartMessageMoveTask, to start a new message movement task from the dead-letter queue;
  2. CancelMessageMoveTask, to cancel the message movement task;
  3. ListMessageMoveTasks, to get 10 most recent message movement tasks for a specified source queue.

Upvotes: 1

rodorgas
rodorgas

Reputation: 1092

Most answers are outdated. Now you can redrive messages from DLQ to another queue with the StartMessageMoveTaskCommand command:

import { SQSClient, StartMessageMoveTaskCommand } from "@aws-sdk/client-sqs"; 

const client = new SQSClient(config);

const input = {
  SourceArn: "ARN_OF_SOURCE_DLQ",
  DestinationArn: "ARN_OF_DESTINATION",
};

const command = new StartMessageMoveTaskCommand(input);
const response = await client.send(command);

You can check the docs in: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_StartMessageMoveTask.html

Upvotes: 2

Maali
Maali

Reputation: 41

SQS DLQ redrive official SDK/CLI support is launched (see here).

Upvotes: 1

Ulad Kasach
Ulad Kasach

Reputation: 12868

here is a simple python script you can use from the cli to do the same, depending only on boto3

usage

python redrive_messages __from_queue_name__ __to_queue_name__

code

import sys
import boto3

from src.utils.get_config.get_config import get_config
from src.utils.get_logger import get_logger

sqs = boto3.resource('sqs')

config = get_config()
log = get_logger()

def redrive_messages(from_queue_name:str, to_queue_name:str):
  # initialize the queues
  from_queue = sqs.get_queue_by_name(QueueName=from_queue_name)
  to_queue = sqs.get_queue_by_name(QueueName=to_queue_name)

  # begin querying for messages
  should_check_for_more = True
  messages_processed = []
  while (should_check_for_more):
    # grab the next message
    messages = from_queue.receive_messages(MaxNumberOfMessages=1);
    if (len(messages) == 0):
      should_check_for_more = False;
      break;
    message = messages[0]

    # requeue it
    to_queue.send_message(MessageBody=message.body, DelaySeconds=0)

    # let the queue know that the message was processed successfully
    messages_processed.append(message)
    message.delete()
  print(f'requeued {len(messages_processed)} messages')

if __name__ == '__main__':
  from_queue_name = sys.argv[1]
  to_queue_name = sys.argv[2]
  redrive_messages(from_queue_name, to_queue_name)

Upvotes: 0

elbik
elbik

Reputation: 1907

Here is also the script (written in Typescript) to move the messages from one AWS queue to another one. Maybe it will be useful for someone.


import {
    SQSClient,
    ReceiveMessageCommand,
    DeleteMessageBatchCommand,
    SendMessageBatchCommand,
} from '@aws-sdk/client-sqs'

const AWS_REGION = 'eu-west-1'
const AWS_ACCOUNT = '12345678901'

const DLQ = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/dead-letter-queue`
const QUEUE = `https://sqs.${AWS_REGION}.amazonaws.com/${AWS_ACCOUNT}/queue`

const loadMessagesFromDLQ = async () => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new ReceiveMessageCommand({
        QueueUrl: DLQ,
        MaxNumberOfMessages: 10,
        VisibilityTimeout: 60,
    })
    const response = await client.send(command)

    console.log('---------LOAD MESSAGES----------')
    console.log(`Loaded: ${response.Messages?.length}`)
    console.log(JSON.stringify(response, null, 4))
    return response
}

const sendMessagesToQueue = async (entries: Array<{Id: string, MessageBody: string}>) => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new SendMessageBatchCommand({
        QueueUrl: QUEUE,
        Entries: entries.map(entry => ({...entry, DelaySeconds: 10})),
        // [
        // {
        //     Id: '',
        //     MessageBody: '',
        //     DelaySeconds: 10
        // }
        // ]
    })
    const response = await client.send(command)
    console.log('---------SEND MESSAGES----------')
    console.log(`Send: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
    console.log(JSON.stringify(response, null, 4))
}

const deleteMessagesFromQueue = async (entries: Array<{Id: string, ReceiptHandle: string}>) => {
    const client = new SQSClient({region: AWS_REGION})
    const command = new DeleteMessageBatchCommand({
        QueueUrl: DLQ,
        Entries: entries,
        // [
        //     {
        //         "Id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
        //         "ReceiptHandle": "someReceiptHandle"
        //     }
        // ]
    })
    const response = await client.send(command)
    console.log('---------DELETE MESSAGES----------')
    console.log(`Delete: Successful - ${response.Successful?.length}, Failed: ${response.Failed?.length}`)
    console.log(JSON.stringify(response, null, 4))
}

const run = async () => {
    const dlqMessageList = await loadMessagesFromDLQ()

    if (!dlqMessageList || !dlqMessageList.Messages) {
        console.log('There is no messages in DLQ')
        return
    }

    const sendMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, MessageBody: msg.Body}))
    const deleteMsgList: any = dlqMessageList.Messages.map(msg => ({ Id: msg.MessageId, ReceiptHandle: msg.ReceiptHandle}))

    await sendMessagesToQueue(sendMsgList)
    await deleteMessagesFromQueue(deleteMsgList)
}

run()


P.S. The script is with room for improvement, but anyway might be useful.

Upvotes: 1

FrostyOnion
FrostyOnion

Reputation: 996

On Dec 1 2021 AWS released the ability to redrive messages from a DLQ back to the source queue(or custom queue).

With dead-letter queue redrive to source queue, you can simplify and enhance your error-handling workflows for standard queues.

DLQ redrive

Source:

Introducing Amazon Simple Queue Service dead-letter queue redrive to source queues

Upvotes: 66

visrahane
visrahane

Reputation: 335

AWS Lambda solution worked well for us -

Detailed instructions: https://serverlessrepo.aws.amazon.com/applications/arn:aws:serverlessrepo:us-east-1:303769779339:applications~aws-sqs-dlq-redriver

Github: https://github.com/honglu/aws-sqs-dlq-redriver.

Deployed with a click and another click to start the redrive!

Upvotes: 0

Ulad Kasach
Ulad Kasach

Reputation: 12868

There are a few scripts out there that do this for you:

# install
npm install replay-aws-dlq;

# use
npx replay-aws-dlq [source_queue_url] [dest_queue_url]
# compile: https://github.com/mercury2269/sqsmover#compiling-from-source

# use
sqsmover -s [source_queue_url] -d [dest_queue_url] 

Upvotes: 44

rd2
rd2

Reputation: 331

DLQ comes into play only when the original consumer fails to consume message successfully after various attempts. We do not want to delete the message since we believe we can still do something with it (maybe attempt to process again or log it or collect some stats) and we do not want to keep encountering this message again and again and stop the ability to process other messages behind this one.

DLQ is nothing but just another queue. Which means we would need to write a consumer for DLQ that would ideally run less frequently (compared to original queue) that would consume from DLQ and produce message back into the original queue and delete it from DLQ - if thats the intended behavior and we think original consumer would be now ready to process it again. It should be OK if this cycle continues for a while since we now also get an opportunity to manually inspect and make necessary changes and deploy another version of original consumer without losing the message (within the message retention period of course - which is 4 days by default).

Would be nice if AWS provides this capability out of the box but I don't see it yet - they're leaving this to the end user to use it in way they feel appropriate.

Upvotes: 4

menrfa
menrfa

Reputation: 1667

We use the following script to redrive message from src queue to tgt queue:

filename: redrive.py

usage: python redrive.py -s {source queue name} -t {target queue name}

'''
This script is used to redrive message in (src) queue to (tgt) queue

The solution is to set the Target Queue as the Source Queue's Dead Letter Queue.
Also set Source Queue's redrive policy, Maximum Receives to 1. 
Also set Source Queue's VisibilityTimeout to 5 seconds (a small period)
Then read data from the Source Queue.

Source Queue's Redrive Policy will copy the message to the Target Queue.
'''
import argparse
import json
import boto3
sqs = boto3.client('sqs')


def parse_args():
    parser = argparse.ArgumentParser()
    parser.add_argument('-s', '--src', required=True,
                        help='Name of source SQS')
    parser.add_argument('-t', '--tgt', required=True,
                        help='Name of targeted SQS')

    args = parser.parse_args()
    return args


def verify_queue(queue_name):
    queue_url = sqs.get_queue_url(QueueName=queue_name)
    return True if queue_url.get('QueueUrl') else False


def get_queue_attribute(queue_url):
    queue_attributes = sqs.get_queue_attributes(
        QueueUrl=queue_url,
        AttributeNames=['All'])['Attributes']
    print(queue_attributes)

    return queue_attributes


def main():
    args = parse_args()
    for q in [args.src, args.tgt]:
        if not verify_queue(q):
            print(f"Cannot find {q} in AWS SQS")

    src_queue_url = sqs.get_queue_url(QueueName=args.src)['QueueUrl']

    target_queue_url = sqs.get_queue_url(QueueName=args.tgt)['QueueUrl']
    target_queue_attributes = get_queue_attribute(target_queue_url)

    # Set the Source Queue's Redrive policy
    redrive_policy = {
        'deadLetterTargetArn': target_queue_attributes['QueueArn'],
        'maxReceiveCount': '1'
    }
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '5',
            'RedrivePolicy': json.dumps(redrive_policy)
        }
    )
    get_queue_attribute(src_queue_url)

    # read all messages
    num_received = 0
    while True:
        try:
            resp = sqs.receive_message(
                QueueUrl=src_queue_url,
                MaxNumberOfMessages=10,
                AttributeNames=['All'],
                WaitTimeSeconds=5)

            num_message = len(resp.get('Messages', []))
            if not num_message:
                break

            num_received += num_message
        except Exception:
            break
    print(f"Redrive {num_received} messages")

    # Reset the Source Queue's Redrive policy
    sqs.set_queue_attributes(
        QueueUrl=src_queue_url,
        Attributes={
            'VisibilityTimeout': '30',
            'RedrivePolicy': ''
        }
    )
    get_queue_attribute(src_queue_url)


if __name__ == "__main__":
    main()

Upvotes: 3

linehrr
linehrr

Reputation: 1748

I wrote a small python script to do this, by using boto3 lib:

conf = {
  "sqs-access-key": "",
  "sqs-secret-key": "",
  "reader-sqs-queue": "",
  "writer-sqs-queue": "",
  "message-group-id": ""
}

import boto3
client = boto3.client(
    'sqs',
        aws_access_key_id       = conf.get('sqs-access-key'),
        aws_secret_access_key   = conf.get('sqs-secret-key')
)

while True:
    messages = client.receive_message(QueueUrl=conf['reader-sqs-queue'], MaxNumberOfMessages=10, WaitTimeSeconds=10)

    if 'Messages' in messages:
        for m in messages['Messages']:
            print(m['Body'])
            ret = client.send_message( QueueUrl=conf['writer-sqs-queue'], MessageBody=m['Body'], MessageGroupId=conf['message-group-id'])
            print(ret)
            client.delete_message(QueueUrl=conf['reader-sqs-queue'], ReceiptHandle=m['ReceiptHandle'])
    else:
        print('Queue is currently empty or messages are invisible')
        break

you can get this script in this link

this script basically can move messages between any arbitrary queues. and it supports fifo queues as well as you can supply the message_group_id field.

Upvotes: 9

Ash
Ash

Reputation: 1290

Don't need to move the message because it will come with so many other challenges like duplicate messages, recovery scenarios, lost message, de-duplication check and etc.

Here is the solution which we implemented -

Usually, we use the DLQ for transient errors, not for permanent errors. So took below approach -

  1. Read the message from DLQ like a regular queue

    Benefits
    • To avoid duplicate message processing
    • Better control on DLQ- Like I put a check, to process only when the regular queue is completely processed.
    • Scale up the process based on the message on DLQ
  2. Then follow the same code which regular queue is following.

  3. More reliable in case of aborting the job or the process got terminated while processing (e.g. Instance killed or process terminated)

    Benefits
    • Code reusability
    • Error handling
    • Recovery and message replay
  4. Extend the message visibility so that no other thread process them.

    Benefit
    • Avoid processing same record by multiple threads.
  5. Delete the message only when either there is a permanent error or successful.

    Benefit
    • Keep processing until we are getting a transient error.

Upvotes: 17

Priyanka Agarwal
Priyanka Agarwal

Reputation: 49

There is a another way to achieve this without writing single line of code. Consider your actual queue name is SQS_Queue and the DLQ for it is SQS_DLQ. Now follow these steps:

  1. Set SQS_Queue as the dlq of SQS_DLQ. Since SQS_DLQ is already a dlq of SQS_Queue. Now, both are acting as the dlq of the other.
  2. Set max receive count of your SQS_DLQ to 1.
  3. Now read messages from SQS_DLQ console. Since message receive count is 1, it will send all the message to its own dlq which is your actual SQS_Queue queue.

Upvotes: 3

Brian Dilley
Brian Dilley

Reputation: 4006

here:

import boto3
import sys
import Queue
import threading

work_queue = Queue.Queue()

sqs = boto3.resource('sqs')

from_q_name = sys.argv[1]
to_q_name = sys.argv[2]
print("From: " + from_q_name + " To: " + to_q_name)

from_q = sqs.get_queue_by_name(QueueName=from_q_name)
to_q = sqs.get_queue_by_name(QueueName=to_q_name)

def process_queue():
    while True:
        messages = work_queue.get()

        bodies = list()
        for i in range(0, len(messages)):
            bodies.append({'Id': str(i+1), 'MessageBody': messages[i].body})

        to_q.send_messages(Entries=bodies)

        for message in messages:
            print("Coppied " + str(message.body))
            message.delete()

for i in range(10):
     t = threading.Thread(target=process_queue)
     t.daemon = True
     t.start()

while True:
    messages = list()
    for message in from_q.receive_messages(
            MaxNumberOfMessages=10,
            VisibilityTimeout=123,
            WaitTimeSeconds=20):
        messages.append(message)
    work_queue.put(messages)

work_queue.join()

Upvotes: 6

Rajkumar
Rajkumar

Reputation: 1695

Here is a quick hack. This is definitely not the best or recommended option.

  1. Set the main SQS queue as the DLQ for the actual DLQ with Maximum Receives as 1.
  2. View the content in DLQ (This will move the messages to the main queue as this is the DLQ for the actual DLQ)
  3. Remove the setting so that the main queue is no more the DLQ of the actual DLQ

Upvotes: 167

Dave
Dave

Reputation: 14198

That looks like your best option. There is a possibility that your process fails after step 2. In that case you'll end up copying the message twice, but you application should be handling re-delivery of messages (or not care) anyway.

Upvotes: 8

Related Questions