Reputation: 318
I'm trying to run a simple program which reads from one kinesis stream, does a trivial transformation, and writes the result to another kinesis stream.
Running locally on Flink 1.4.0 (this is the version supported on EMR currently, so no way of upgrading).
Here is the code:
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val consumerConfig = new Properties()
consumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST")
val kinesisMaps = env.addSource(new FlinkKinesisConsumer[String](
"source-stream", new SimpleStringSchema, consumerConfig))
val jsonMaps = kinesisMaps.map { jsonStr => JSON.parseFull(jsonStr).get.asInstanceOf[Map[String, String]] }
val values = jsonMaps.map(jsonMap => jsonMap("field_name"))
values.print()
val producerConfig = new Properties()
producerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
val kinesisProducer = new FlinkKinesisProducer[String](new SimpleStringSchema, producerConfig)
kinesisProducer.setFailOnError(true)
kinesisProducer.setDefaultStream("target-stream")
kinesisProducer.setDefaultPartition("0")
values.addSink(kinesisProducer)
// execute program
env.execute("Flink Kinesis")
}
If I comment out the producing code, the program runs as expected and prints the correct values.
As soon as I add the producer code, I get the following exception:
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.DaemonException: The child process has been shutdown and can no longer accept messages.
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.Daemon.add(Daemon.java:176)
at org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.producer.KinesisProducer.addUserRecord(KinesisProducer.java:477)
at org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer.invoke(FlinkKinesisProducer.java:248)
at org.apache.flink.streaming.api.functions.sink.SinkFunction.invoke(SinkFunction.java:52)
at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:608)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$BroadcastingOutputCollector.collect(OperatorChain.java:569)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:41)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:549)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:524)
at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:504)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:831)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:809)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:486)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:264)
at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:210)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Any idea what's the cause of this?
Upvotes: 0
Views: 2416
Reputation: 318
Apparently, this is an issue with the old version of Amazon KPL which is used in Flink 1.4.
There are at least two possible solutions for this:
Upgrade to Flink version 1.5. You can still use it on EMR, if you install it as described here, under the section Custom EMR Installation: https://ci.apache.org/projects/flink/flink-docs-release-1.5/ops/deployment/aws.html
When building the Kinesis connector for Flink 1.4, you can build it with newer AWS dependencies: I've cherry-picked the aws
dependency changes in pom.xml
of the connector from 1.5, and built the connector with them. Looks like it's working as expected.
Upvotes: 1