Reputation: 3
Am trying to get familiar with Apache beam Kafka IO and getting following error
Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:348)
at org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:318)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:213)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
at com.andrewjones.KafkaConsumerExample.main(KafkaConsumerExample.java:58)
Caused by: org.apache.kafka.common.errors.SerializationException: Size of data received by LongDeserializer is not 8
Following is piece of code which reads messages from a kafka topic. Appreciate if you someone can provide some pointers.
public class KafkaConsumerJsonExample { static final String TOKENIZER_PATTERN = "[^\p{L}]+";
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.create();
// Create the Pipeline object with the options we defined above.
Pipeline p = Pipeline.create(options);
p.apply(KafkaIO.<Long, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("myTopic2")
.withKeyDeserializer(LongDeserializer.class)
.withValueDeserializer(StringDeserializer.class)
.updateConsumerProperties(ImmutableMap.of("auto.offset.reset", (Object)"earliest"))
// We're writing to a file, which does not support unbounded data sources. This line makes it bounded to
// the first 5 records.
// In reality, we would likely be writing to a data source that supports unbounded data, such as BigQuery.
.withMaxNumRecords(5)
.withoutMetadata() // PCollection<KV<Long, String>>
)
.apply(Values.<String>create())
.apply(TextIO.write().to("wordcounts"));
System.out.println("running data pipeline");
p.run().waitUntilFinish();
}
}
Upvotes: 0
Views: 852
Reputation: 1443
The issue is caused by using LongDeserializer
for keys that seems were serialised by other serialiser than Long and it depends how you produced the records.
So, you either can use a proper deserializer or, if keys don't matter, as a workaround, try to use StringDeserializer
for keys as well.
Upvotes: 1