Marco
Marco

Reputation: 560

Getting Service: AmazonKinesis; Status Code: 502 with apache-flink and localstack Kinesis

My local setup consists of local apache-flink (installed via brew) and localstack with the Kinesis service running.

my docker-compose has

  localstack:
    image: localstack/localstack:0.10.7
    environment:
      - SERVICES=kinesis
    ports:
      - "4568:4568"

and my Kinesis Consumer:

kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ACCESS_KEY_ID, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_SECRET_ACCESS_KEY, "123");
kinesisConsumerConfig.setProperty(ConsumerConfigConstants.AWS_ENDPOINT, "http://localhost:4568");

but when I run the Flink program I get this error:

Caused by: org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.AmazonKinesisException: null (Service: AmazonKinesis; Status Code: 502; Error Code: null; Request ID: null)

It only happens when using localstack. If I connect to my Kinesis stream on my AWS account it works perfectly.

Upvotes: 2

Views: 1985

Answers (3)

fsilvestre
fsilvestre

Reputation: 299

Add these lines before you add your FlinkKinesisConsumer as a source:

System.setProperty("com.amazonaws.sdk.disableCbor", "true") System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "true")

This has the same effect of exporting the ENV vars, but having it in code is less time spent in setting up the environment.

Upvotes: 5

Marco
Marco

Reputation: 560

Turns out we need to disable cbor and cert checking via ENV var and start flink in the same console

export AWS_CBOR_DISABLE=1
DISABLE_CERT_CHECKING_JAVA_OPTS="-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking"
export FLINK_ENV_JAVA_OPTS=${DISABLE_CERT_CHECKING_JAVA_OPTS}
/usr/local/Cellar/apache-flink/1.9.1/libexec/bin/start-cluster.sh

Upvotes: 0

alessiosavi
alessiosavi

Reputation: 3037

If you are using Java, you can use the jar library for simulate some amazon components:

In first instance you need to add the following component in your pom.xml in order to be able to initialize the localstack directly during the test:

<dependency>
    <groupId>cloud.localstack</groupId>
    <artifactId>localstack-utils</artifactId>
    <version>0.2.0</version>
    <scope>test</scope>
</dependency>

Then, you need to specify the following library if you need to use kinesis and dynamo, cause the latest one proivded from aws are not compliant with the latest version of localstack:

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-core</artifactId>
    <version>1.11.642</version>
    <scope>test</scope>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.8.10</version>
    <scope>test</scope>
</dependency>

Now you can use the following annotations in order to instantiate the stack using docker, the images will be pulled automatically if not present in the system. So no is not necessary to run any docker/docker-compose image.

@LocalstackDockerProperties(services = {"kinesis", "dynamodb"})
@ExtendWith(LocalstackDockerExtension.class)
@Slf4j
public class TestPipelineComplete {

public static final String AWS_ACCESS_KEY_ID = "foo";
public static final String AWS_SECRET_ACCESS_KEY = "bar";
    static {
        System.setProperty("AWS_ACCESS_KEY_ID", AWS_ACCESS_KEY_ID);
        System.setProperty("AWS_SECRET_ACCESS_KEY", AWS_SECRET_ACCESS_KEY);
        // These two lines are fundamental
        cloud.localstack.TestUtils.setEnv("aws.cborEnabled", "false");
        cloud.localstack.TestUtils.setEnv("AWS_CBOR_DISABLE", "true");
    }
}

Now, if you need to initialize a DynamoDB client you can use the following line:

final AmazonDynamoDB clientDynamoDB = cloud.localstack.TestUtils.getClientDynamoDB();

Now, if you need to initialize a Kinesis client, you can use the following line:

final AmazonKinesis kinesisClient = cloud.localstack.TestUtils.getClientKinesis();

If you need to read the data from kinesis (test purpouse), you can use the following code snippet as template (https://gist.github.com/alessiosavi/4ea88d73d6853de695843631207b7bc6):

package org.example;

import com.amazonaws.services.kinesis.AmazonKinesis;
import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
import com.amazonaws.services.kinesis.model.*;

import java.nio.charset.StandardCharsets;
import java.util.List;

public class App {

    private static final String streamName = "API_NAME" + "_kineis-notification-stream";
    private static final AmazonKinesis client = AmazonKinesisClientBuilder.defaultClient();

    public static void main(String[] args) {
        printKinesisRecords(getRecordsFromKinesis(client));
    }


    private static List<Record> getRecordsFromKinesis(AmazonKinesis kClient) {
        final ListShardsRequest listShardsRequest = new ListShardsRequest().withStreamName(streamName).withMaxResults(1);

        Shard shard = kClient.listShards(listShardsRequest).getShards().get(0);
        GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest();
        getShardIteratorRequest.setStreamName(streamName);
        getShardIteratorRequest.setShardId(shard.getShardId());
        getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON");

        final GetShardIteratorResult getShardIteratorResult = kClient.getShardIterator(getShardIteratorRequest);
        String shardIterator = getShardIteratorResult.getShardIterator();

        // Create a new getRecordsRequest with an existing shardIterator
        // Set the maximum records to return to 1
        GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
        getRecordsRequest.setShardIterator(shardIterator);
        getRecordsRequest.setLimit(10);

        final GetRecordsResult result = kClient.getRecords(getRecordsRequest);
        // Put the result into record list. The result can be empty.
        return result.getRecords();
    }

    private static void printKinesisRecords(List<Record> records) {
        for (Record record : records) {
            System.err.println("RECORD: " + StandardCharsets.UTF_8.decode(record.getData()).toString());
        }
    }
}

Upvotes: 1

Related Questions