Reputation: 4231
I am trying to read kinesis (actually running a local mocking with kinesa) from flink. This is my consumer configuration :
val consumerConfig = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "x")
consumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "x")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val kinesis =
env.addSource(new FlinkKinesisConsumer[String](
inputStreamName, new SimpleStringSchema(), consumerConfig))
.print()
but when placing a record on the stream :
aws --endpoint-url=http://localhost:4567 kinesis put-record --data "hello" --partition-key 0 --stream-name ExampleInputStream
i am getting the following error:
org.apache.flink.kinesis.shaded.com.amazonaws.SdkClientException: Unable to execute HTTP request: Current token (VALUE_STRING) not VALUE_EMBEDDED_OBJECT, can not access as binary
at [Source: (org.apache.flink.kinesis.shaded.com.amazonaws.event.ResponseProgressInputStream); line: -1, column: 304]
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleRetryableException(AmazonHttpClient.java:1201)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1147)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.doExecute(AmazonHttpClient.java:796)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeWithTimer(AmazonHttpClient.java:764)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.execute(AmazonHttpClient.java:738)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.access$500(AmazonHttpClient.java:698)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutionBuilderImpl.execute(AmazonHttpClient.java:680)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:544)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:524)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.doInvoke(AmazonKinesisClient.java:2809)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2776)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.invoke(AmazonKinesisClient.java:2765)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.executeGetRecords(AmazonKinesisClient.java:1292)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClient.getRecords(AmazonKinesisClient.java:1263)
at org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy.getRecords(KinesisProxy.java:247)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.getRecords(ShardConsumer.java:401)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:244)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.fasterxml.jackson.core.JsonParseException: Current token (VALUE_STRING) not VALUE_EMBEDDED_OBJECT, can not access as binary
at [Source: (org.apache.flink.kinesis.shaded.com.amazonaws.event.ResponseProgressInputStream); line: -1, column: 304]
at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1840)
at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:712)
at com.fasterxml.jackson.dataformat.cbor.CBORParser.getBinaryValue(CBORParser.java:1660)
at com.fasterxml.jackson.core.JsonParser.getBinaryValue(JsonParser.java:1484)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.SimpleTypeCborUnmarshallers$ByteBufferCborUnmarshaller.unmarshall(SimpleTypeCborUnmarshallers.java:198)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.SimpleTypeCborUnmarshallers$ByteBufferCborUnmarshaller.unmarshall(SimpleTypeCborUnmarshallers.java:196)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:61)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.RecordJsonUnmarshaller.unmarshall(RecordJsonUnmarshaller.java:29)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.ListUnmarshaller.unmarshallJsonToList(ListUnmarshaller.java:92)
at org.apache.flink.kinesis.shaded.com.amazonaws.transform.ListUnmarshaller.unmarshall(ListUnmarshaller.java:46)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:53)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.transform.GetRecordsResultJsonUnmarshaller.unmarshall(GetRecordsResultJsonUnmarshaller.java:29)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:118)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.JsonResponseHandler.handle(JsonResponseHandler.java:43)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.response.AwsResponseHandlerAdapter.handle(AwsResponseHandlerAdapter.java:69)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleResponse(AmazonHttpClient.java:1714)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleSuccessResponse(AmazonHttpClient.java:1434)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1356)
at org.apache.flink.kinesis.shaded.com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeHelper(AmazonHttpClient.java:1139)
... 20 more
Upvotes: 1
Views: 1598
Reputation: 4231
In my case i added this system properties :
System.setProperty(com.amazonaws.SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true")
System.setProperty(SDKGlobalConfiguration.AWS_CBOR_DISABLE_SYSTEM_PROPERTY, "true")
System.setProperty(SdkSystemSetting.CBOR_ENABLED.property(), "false")
Upvotes: 1
Reputation: 3634
From the stacktrace it looks like Flink kinesis expects Cbor while your record is writting as a simple string.
It seems like this post contains some hints on how to align your local setup with the consumer side.
Upvotes: 0