Reputation: 1941
So far I have followed the instructions documented for Flink's kinesis connector to use a local Kinesis.
Properties producerConfig = new Properties();
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1");
producerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "aws_access_key_id");
producerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "aws_secret_access_key");
producerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4567");
With a Flink producer, these instructions work with a local kinesis (I use Kinesalite).
However, with a Flink consumer, I get an exception that aws.region
and aws.endpoint
are not both allowed. But region is required, which means its not possible to override the endpoint.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.
Is this a bug in the connector? I see a related PR: https://github.com/apache/flink/pull/6045 .
I found a workaround on Flink's mailing list, but they describe this as an issue for the producer and not the consumer, whereas i see the opposite (i think), so not sure about this. It's really confusing.
Upvotes: 8
Views: 1417
Reputation: 21563
There has been some progress made since this question was asked.
The asker pushed the issue in this jira which was marked as a duplicate of this second jira .
The issue should now be resolved, and the fix available for version 1.10 and above.
Upvotes: 2
Reputation: 3037
The problem is related to the XOR
condition in the validation check.
As you can see, the validateConsumerConfiguration
method perform an XOR validation in the if statement. So, you can only specify one of the two parameters checked.
For set a custom URL, you need to remove the AWSConfigConstants.AWS_REGION
properties and use only the link.
// Set the given URL
consumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, URL);
// Remove the region
consumerConfig.remove(AWSConfigConstants.AWS_REGION);
This solution, fix the error related to the following StackTrace:
java.lang.IllegalArgumentException: For FlinkKinesisConsumer either AWS region ('aws.region') or AWS endpoint ('aws.endpoint') must be set in the config.
at org.apache.flink.streaming.connectors.kinesis.util.KinesisConfigUtil.validateConsumerConfiguration(KinesisConfigUtil.java:92)
Upvotes: -1