Jack
Jack

Reputation: 1941

Apache Flink - Unable to use local Kinesis for FlinkKinesisConsumer

So far I have followed the instructions documented for Flink's kinesis connector to use a local Kinesis.

Using Non-AWS Kinesis Endpoints for Testing

Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");

With a Flink producer, these instructions work with a local kinesis (I use Kinesalite).

However, with a Flink consumer, I get an exception that aws.region and aws.endpoint are not both allowed. But region is required, which means its not possible to override the endpoint.

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.

Is this a bug in the connector? I see a related PR: https://github.com/apache/flink/pull/6045 .

I found a workaround on Flink's mailing list, but they describe this as an issue for the producer and not the consumer, whereas i see the opposite (i think), so not sure about this. It's really confusing.

Upvotes: 8

Views: 1417

Answers (2)

Dennis Jaheruddin
Dennis Jaheruddin

Reputation: 21563

There has been some progress made since this question was asked.

The asker pushed the issue in this jira which was marked as a duplicate of this second jira .

The issue should now be resolved, and the fix available for version 1.10 and above.

Upvotes: 2

alessiosavi
alessiosavi

Reputation: 3037

The problem is related to the XOR condition in the validation check. As you can see, the validateConsumerConfiguration method perform an XOR validation in the if statement. So, you can only specify one of the two parameters checked. enter image description here

For set a custom URL, you need to remove the AWSConfigConstants.AWS_REGION properties and use only the link.

// Set the given URL
consumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, URL);
// Remove the region
consumerConfig.remove(AWSConfigConstants.AWS_REGION);

This solution, fix the error related to the following StackTrace:

java.lang.IllegalArgumentException: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.

at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.validateConsumerConfiguration(KinesisConfigUtil.java:92)

Upvotes: -1

Related Questions