Reputation: 1445
I am trying to mock an AWS SQS with moto, below is my code
from myClass import get_msg_from_sqs
from moto import mock_sqs
#from moto.sqs import mock_sqs
@mock_sqs
def test_get_all_msg_from_queue():
#from myClass import get_msg_from_sqs
conn = boto3.client('sqs', region_name='us-east-1')
queue = conn.create_queue(QueueName='Test')
os.environ["SQS_URL"] = queue["QueueUrl"]
queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
#Tried this as well
#conn.send_message(QueueUrl=queue["QueueUrl"], MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
resp = get_msg_from_sqs(queue["QueueUrl"])
assert resp is not None
While executing this I am getting the following error
> queue.send_message( MessageBody=json.dumps({'a': '1', 'b': '2', 'c': '3'}))
E AttributeError: 'dict' object has no attribute 'send_message'
If I try another way of to send a message in SQS(see commented out code #Tried this as well) then at the time of actual SQS calling in my method get_msg_from_sqs, I am getting the below error
E botocore.exceptions.ClientError: An error occurred
(InvalidAddress) when calling the ReceiveMessage operation:
The address https://queue.amazonaws.com/ is not valid for this endpoint.
I am running it on win10 with PyCharm and the moto version is set to
moto = "^2.2.6"
My code is given below
sqs = boto3.client('sqs')
def get_msg_from_queue(queue_url: str) -> dict:
return sqs.receive_message(QueueUrl=queue_url, AttributeNames=['All'],
MaxNumberOfMessages=1, VisibilityTimeout=3600, WaitTimeSeconds=0)
What am I missing over here?
Upvotes: 6
Views: 15074
Reputation: 11481
Your queue
variable is a dict returned by create_queue:
queue = conn.create_queue(QueueName='Test')
It is not a queue and thus you cannot call sendMessage
on it.
To do that, you need to create a queue object:
conn = boto3.client('sqs')
sqs = boto3.resource('sqs')
response = conn.create_queue(QueueName='Test')
queue_url = response["QueueUrl"]
queue = sqs.Queue(queue_url)
queue.send_message()
Upvotes: 4
Reputation: 5934
As per @gshpychka, you need to look at how create_queue
works. Specifically it returns a dict of this form:
Response Structure (dict) --
QueueUrl (string) --
The URL of the created Amazon SQS queue.
So using this api you do:
import boto3
from time import sleep
conn = boto3.client('sqs')
queue = conn.create_queue(QueueName="Test")["QueueUrl"]
sleep(1) # wait at least 1s before using queue
response = conn.send_message(
QueueUrl=queue,
MessageBody='string',
...)
I agree that the docs are confusing. The confusion likely came about because of the sqs resource api, which works differently:
import boto3
from time import sleep
sqs = boto3.resource('sqs')
queue = sqs.create_queue(QueueName="Test2")
sleep(1)
queue.send_message(...)
This works because this api returns a Queue
object, which is probably what you expected.
Please note that @gshpychka had already given the answer in a comment; I just wrote it out.
Upvotes: 2