voutasaurus
voutasaurus

Reputation: 3268

AWS Kinesis: determine whether a named stream exists

My goal is to use the AWS Kinesis API to create a Kinesis stream with a particular name if it doesn't already exist and then write to it whether it was there in the first place or not.

This is what I've come up with so far. Attempt to create the stream. If it fails with code 400 and returns a request ID then maybe the stream already exists. Then write to the stream to make sure it's there. In Go:

k := kinesis.New(session.New())
_, err := k.CreateStream(&kinesis.CreateStreamInput{
    ShardCount: aws.Int64(2),
    StreamName: aws.String("stream"),
})
if err != nil {
    if reqerr, ok := err.(awserr.RequestFailure); ok {
        if reqerr.RequestID() == "" {
            log.Fatal("request was not delivered as it has no ID",
                reqerr.Code(),
                reqerr.Message(),
            )
        }
        if reqerr.StatusCode() != 400 {
            log.Fatal("unexpected status code", reqerr.StatusCode())
        }
    } else {
        log.Fatal(err)
    }
}
// Code 400 + requestID does not necessarily mean that the stream exists
// So write to the stream to confirm it exists
_, err = k.PutRecord(&kinesis.PutRecordInput{
    Data:         []byte("Hello Kinesis"),
    PartitionKey: aws.String("partitionkey"),
    StreamName:   aws.String("stream"),
})
if err != nil {
    log.Fatal(err)
}

The approach above seems convoluted and more importantly I don't think it effectively matches on the exact error I'm expecting. Doing a string compare on the error message seems like a bad choice too because that could easily change.

I'm wondering if there is a more reliable and straightforward way to achieve this? Listing all the available streams to search is a pain because it is a linear search and involves multiple requests with new values of ExclusiveStartStreamName.

Upvotes: 2

Views: 2666

Answers (1)

Mircea
Mircea

Reputation: 10566

Describe the stream. If stream is not there create the stream and spin waiting to become active.

You will not be able to push to the stream immediately after it's created. It's going to transition to CREATING first, and after a while (seconds) to ACTIVE.

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#DescribeStream-instance_method

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#CreateStream-instance_method

You can also use ListStreams to quickly peek at the state of all streams:

https://docs.aws.amazon.com/sdk-for-go/api/service/kinesis/Kinesis.html#ListStreams-instance_method

Upvotes: 1

Related Questions