MeetJoeBlack
MeetJoeBlack

Reputation: 2904

Apache Beam Combine grouped values

I'm trying to find a way to re-order my Kafka messages and send ordered messages to a new topic using Apache Beam in conjunction with Google DataFlow.

I have Kafka publisher that sends String messages of the following format: {system_timestamp}-{event_name}?{parameters}

for example:

1494002667893-client.message?chatName=1c&messageBody=hello
1494002656558-chat.started?chatName=1c&chatPatricipants=3

What I want to do is reorder events based on {system-timestamp} part of the message and within a 5-sec window, cause our publishers doesn't guarantee that messages will be sent in accordance with {system-timestamp} value.

I've written a mock sorter function that sorts events received from Kafka(using KafkaIO source):

static class SortEventsFunc extends DoFn<KV<String, Iterable<String>>, KV<String, Iterable<String>>> {

   @ProcessElement
   public void processElement(ProcessContext c) {
       KV<String, Iterable<String>> element = c.element();

       System.out.println("");
       System.out.print("key: " + element.getKey() + ";");

       Iterator<String> it = element.getValue().iterator();
       List<String> list = new ArrayList<>();
       while (it.hasNext()) {
           String val = it.next();
           System.out.print("value: " + val);
           list.add(val);
       }
       Collections.sort(list, Comparator.naturalOrder());
       c.output(KV.of(element.getKey(), list));
   }
 }

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();

    DirectOptions directOptions = options.as(DirectOptions.class);
    directOptions.setRunner(DirectRunner.class);

    // Create the Pipeline object with the options we defined above.
    Pipeline pipeline = Pipeline.create(options);
    pipeline
        // read from Kafka
        .apply(KafkaIO.<String,String>read()
            .withBootstrapServers("localhost:9092")
            .withTopics(new ArrayList<>((Arrays.asList("events"))))
            .withKeyDeserializer(StringDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withoutMetadata())
        // apply window
        .apply(Window.<KV<String,String>>into(
                FixedWindows.of(Duration.standardSeconds(5L))))
        // group by key before sorting
        .apply(GroupByKey.<String, String>create()) // return PCollection<KV<String, Iterable<String>>
        // sort events
        .apply(ParDo.of(new SortEventsFunc()))
        //combine KV<String, Iterable<String>> input to KafkaIO acceptable KV<String, String> format
        .apply(Combine.perKey()) //:TODO somehow convert KV<String, Iterable<String>> to KV<String, String>
        // write ordered events to Kafka
        .apply(KafkaIO.<String, String>write()
                .withBootstrapServers("localhost:9092")
                .withTopic("events-sorted")
                .withKeySerializer(StringSerializer.class)
                .withValueSerializer(StringSerializer.class)
            );
    pipeline.run();
}

So I've grouped messages using GroupByKey.<String, String>create() transform, by after sortrin events I need somehow to convert them from KV<String, Iterable<String>> to accepted by KafkaIO KV<String, String> or KV<Void, String> values. So all I want to do is to ignore created by grouping transform keys and simply pass each value as a separate message to KafkaIO writer.

I explored Combine#perKey transform but it accepts SerializableFunction that can only combine all values to one String.(with some delimiter), as a result I pass only one value as one concatenated string instead of each value (that was read by KafkaIO#read()) to KafkaIO writer.

Upvotes: 1

Views: 3358

Answers (1)

Alexey
Alexey

Reputation: 318

It's quite simple actually! The trick here is that you can call c.output as many times as you wish, inside the @ProcessElement method.

So in your case, just define a DoFn<KV<String, Iterable<String>>, KV<String, String>>, iterate over the c.element().getValue() collection, and call c.output for each one of them.

Upvotes: 3

Related Questions