Abhishek Patil
Abhishek Patil

Reputation: 1445

Python AWS SQS mocking with MOTO

I am trying to mock an AWS SQS with moto, below is my code

from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs


@mock_sqs
def test_get_all_msg_from_queue():
    
    #from myClass import get_msg_from_sqs
    
    conn = boto3.client('sqs', region_name='us-east-1')
    
    queue = conn.create_queue(QueueName='Test')
    
    os.environ["SQS_URL"] = queue["QueueUrl"]
    
    queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
    
    
    #Tried this as well
    #conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))

    resp = get_msg_from_sqs(queue["QueueUrl"]) 

    assert resp is not None

While executing this I am getting the following error

>       queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E       AttributeError: 'dict' object has no attribute 'send_message'

If I try another way of to send a message in SQS(see commented out code #Tried this as well) then at the time of actual SQS calling in my method get_msg_from_sqs, I am getting the below error

E  botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation: 
The address https://queue.amazonaws.com/ is not valid for this endpoint.

I am running it on win10 with PyCharm and the moto version is set to

moto = "^2.2.6"

My code is given below

sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
    return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
               MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)

What am I missing over here?

Upvotes: 6

Views: 15074

Answers (2)

gshpychka
gshpychka

Reputation: 11481

Your queue variable is a dict returned by create_queue:

queue = conn.create_queue(QueueName='Test')

It is not a queue and thus you cannot call sendMessage on it.

To do that, you need to create a queue object:

conn = boto3.client('sqs')
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueUrl"]
queue = sqs.Queue(queue_url)

queue.send_message()

Upvotes: 4

2e0byo
2e0byo

Reputation: 5934

As per @gshpychka, you need to look at how create_queue works. Specifically it returns a dict of this form:

Response Structure (dict) --

QueueUrl (string) --

The URL of the created Amazon SQS queue.

So using this api you do:

import boto3
from time import sleep

conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
    QueueUrl=queue,
    MessageBody='string',
    ...)

I agree that the docs are confusing. The confusion likely came about because of the sqs resource api, which works differently:

import boto3
from time import sleep

sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)

This works because this api returns a Queue object, which is probably what you expected.

Please note that @gshpychka had already given the answer in a comment; I just wrote it out.

Upvotes: 2

Related Questions