Reputation: 1809
I've implemented a Beam pipeline reading from Kafka, based on the docs here: https://github.com/apache/beam/blob/master/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java#L125
The pipeline itself works fine for bounded sources and I have test cases where it reads from files without issue.
The code reading from Kafka is very simple, and basically identical to the example:
PCollection<String> input = p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers(KAFKA_BROKER)
.withTopics(Arrays.asList(KAFKA_READ_TOPIC))
.withKeyCoder(BigEndianLongCoder.of())
.withValueCoder(StringUtf8Coder.of())
.withTimestampFn(new TimestampKafkaStrings())
.withoutMetadata())
.apply(Values.<String>create());
The application starts fine, and seems to connect to Kafka. However, as soon as I write to Kafka from and another process and the pipeline starts reading, I get the following exception on the first read:
INFO: Kafka version : 0.10.2.0
Apr 04, 2017 9:46:18 AM org.apache.kafka.common.utils.AppInfoParser$AppInfo <init>
INFO: Kafka commitId : 576d93a8dc0cf421
Apr 04, 2017 9:46:30 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader advance
INFO: Reader-0: first record offset 2000
Apr 04, 2017 9:46:30 AM org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader consumerPollLoop
INFO: Reader-0: Returning from consumer pool loop
[WARNING]
java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:293)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.RuntimeException: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:453)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:350)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:71)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:210)
at com.groupbyinc.beam.SessionRollup.main(SessionRollup.java:186)
... 6 more
Caused by: org.apache.beam.sdk.coders.CoderException: java.io.EOFException
at org.apache.beam.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:64)
at org.apache.beam.sdk.coders.BigEndianLongCoder.decode(BigEndianLongCoder.java:33)
at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.decode(KafkaIO.java:1018)
at org.apache.beam.sdk.io.kafka.KafkaIO$UnboundedKafkaReader.advance(KafkaIO.java:989)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.startReader(UnboundedReadEvaluatorFactory.java:190)
at org.apache.beam.runners.direct.UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.processElement(UnboundedReadEvaluatorFactory.java:128)
at org.apache.beam.runners.direct.TransformExecutor.processElements(TransformExecutor.java:139)
at org.apache.beam.runners.direct.TransformExecutor.run(TransformExecutor.java:107)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
It seems like there is something wrong with the way the key decoder is attempting to read the Kafka message key. In the source data these keys are not being set explicitly, so I assume they are defaulting to timestamps within Kafka(?).
Any ideas on how to debug this issue further? Or resources I can look at? Functioning examples?
EDIT: Removing the .withTimestampFn()
portion of the pipeline has no effect. And the code seems to fail before it ever gets to that point.
Upvotes: 3
Views: 2601
Reputation: 1809
The answer is that the key is not a long. It seems that by default, the key is a random hash, which is a String
. Weird that the Beam KafkaIO library wouldn't handle the default Kafka use case out of the box.
So my theory is that when the BigEndianLongCoder
attempts to decode the value, it hits EOF because a long is bigger than a char, so it runs out of stuff to read before it thinks it's read enough stuff for a long.
So my fixed code is below:
PCollection<String> input = p.apply(KafkaIO.<Long, String>readBytes()
.withBootstrapServers(KAFKA_BROKER)
.withTopics(Arrays.asList(KAFKA_READ_TOPIC))
.withTimestampFn(new TimestampKafkaStrings())
.withoutMetadata())
.apply(Values.<byte[]>create())
.apply(ParDo.of(new BytesToString()));
The important detail is to call readBytes()
instead of read()
and then parse the bytes into a string yourself.
In my case, I ran into another issue after that because the strings being read are stringified JSON from a Node process. For some reason, Jackson could not process the escaped JSON as it came in from Kafka, so it had to be unescaped first, then parsed.
All of this points to a weakness in the Apache Beam KafkaIO library though. The examples given for it's use are inaccurate and don't work in simple default cases. Furthermore, because it's so new, very few people have put examples online of it's use, so it can be challenging to find solutions when you have a problem.
I should really just submit a pull request with better examples.
Upvotes: 3