Reputation: 23
I’m trying to write a proof-of-concept which takes messages from Kafka, transforms them using Beam on Flink, then pushes the results onto a different Kafka topic.
I’ve used the KafkaWindowedWordCountExample as a starting point, and that’s doing the first part of what I want to do, but it outputs to text files as opposed to Kafka. FlinkKafkaProducer08 looks promising, but I can’t figure out how to plug it into the pipeline. I was thinking that it would be wrapped with an UnboundedFlinkSink, or some such, but that doesn’t seem to exist.
Any advice or thoughts on what I’m trying to do?
I’m running the latest incubator-beam (as of last night from Github), Flink 1.0.0 in cluster mode and Kafka 0.9.0.1, all on Google Compute Engine (Debian Jessie).
Upvotes: 2
Views: 1188
Reputation: 814
Sink transform for writing to Kafka was added to Apache Beam / Dataflow in 2016. See JavaDoc for KafkaIO
in Apache Beam for usage example.
Upvotes: 0
Reputation: 814
There is currently no UnboundedSink class in Beam. Most unbounded sinks are implemented using a ParDo
.
You may wish to check out the KafkaIO connector. This is a Kafka reader that works in all Beam runners, and implements the parallel reading, checkpointing, and other UnboundedSource
APIs. That pull request also includes a crude sink in the TopHashtags example pipeline by writing to Kafka in a ParDo
:
class KafkaWriter extends DoFn<String, Void> {
private final String topic;
private final Map<String, Object> config;
private transient KafkaProducer<String, String> producer = null;
public KafkaWriter(Options options) {
this.topic = options.getOutputTopic();
this.config = ImmutableMap.<String, Object>of(
"bootstrap.servers", options.getBootstrapServers(),
"key.serializer", StringSerializer.class.getName(),
"value.serializer", StringSerializer.class.getName());
}
@Override
public void startBundle(Context c) throws Exception {
if (producer == null) { // in Beam, startBundle might be called multiple times.
producer = new KafkaProducer<String, String>(config);
}
}
@Override
public void finishBundle(Context c) throws Exception {
producer.close();
}
@Override
public void processElement(ProcessContext ctx) throws Exception {
producer.send(new ProducerRecord<String, String>(topic, ctx.element()));
}
}
Of course, we would like to add sink support in KafkaIO
as well. It would effectively be same as KafkaWriter
above, but much simpler to use.
Upvotes: 1