user9910093
user9910093

Reputation: 149

How to configuring AWS Firehose with moto (Python AWS mocking library)

I am creating my firehose resource like this, as well as an s3 bucket with name self.problem_reporter_bucket_name. But, after calling put_record, there is nothing in my bucket. That is, when I call list_objects on my bucket, there are no items.

self.firehose.create_delivery_stream(
  DeliveryStreamName=self.problem_reporter_delivery_stream_name,
  S3DestinationConfiguration={
    'RoleARN': 'arn:aws:iam::123456789012:role/firehose_delivery_role',
    'BucketARN': 'arn:aws:s3:::' + self.problem_reporter_bucket_name,
    'Prefix': 'myPrefix',
    'BufferingHints': {
      'SizeInMBs': 1,
      'IntervalInSeconds': 60
    },
    'CompressionFormat': 'UNCOMPRESSED',
 })
)

Does even moto support my use case?

Upvotes: 2

Views: 1443

Answers (1)

kosumi
kosumi

Reputation: 11

moto doesn't seem to support Firehose. I did something like this to test my Firehose related code;

If firehose resource is defined in the module that is tested:

from unittest.mock import patch, MagicMock
@patch('mymodule.boto3')
def test_put_record(boto3):
    record = {'ID": "123'}
    my_put_firehose_record(record)

    all_args = {
        'DeliveryStreamName': 'my-test-firehose-stream',
        'Record': record
    }

    boto3.client.assert_called_with('firehose')
    boto3.client().put_record.assert_called_with(**all_args)

If firehouse resource is defined outside the module that is tested:

from unittest.mock import patch, MagicMock
def test_put_record():
    firehose = MagicMock()
    record = {'ID': '123'}
    my_put_firehose_record_external(firehose, record)

    all_args = {
        'DeliveryStreamName': 'my-test-firehose-stream',
        'Record': record
    }

    firehose.put_record.assert_called_with(**all_args)

Upvotes: 1

Related Questions