user152468
user152468

Reputation: 3242

Kafka-Streaming: How to collect pairs of messages and write to a new topic

This is a beginner's question to kafka-streaming.

How would you collect pairs of messages using the java kafka-streaming library and write them to a new output topic?

I was thinking about something like this:

private void accumulateTwo(KStream<String, String> messages) {
    Optional<String> accumulator = Optional.empty();
    messages.mapValues(value -> {
        if (accumulator.isPresent()) {
            String tmp = accumulator.get();
            accumulator = Optional.empty();
            return Optional.of(new Tuple<>(tmp, value));
        }
        else {
            accumulator = Optional.of(value);
            return Optional.empty();
        }
    }).filter((key, value) -> value.isPresent()).to("pairs");

Yet this will not work, since variables in Java Lambda expressions must be final.

Any ideas?

Upvotes: 2

Views: 2591

Answers (2)

user152468
user152468

Reputation: 3242

EDIT:

As suggested in the comments, three additional steps are necessary:

  1. The Transformer must explicitly store its state within a state store. It will get a reference to the state store from the ProcessorContext, which it is getting passed in the init method.
  2. The state store must be registered with the StreamsBuilder
  3. The name of the state store must be passed within the transform method.

In this example it is sufficient to store the last message we have seen. We are using a KeyValueStore for this which will have exactly zero or one entry at each point in time.

public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {

    private String storeName;

    public PairTransformerSupplier(String storeName) {
        this.storeName = storeName;
    }

    @Override
    public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
        return new PairTransformer<>(storeName);
    }
}


public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
    private ProcessorContext context;
    private String storeName;
    private KeyValueStore<Integer, V> stateStore;

    public PairTransformer(String storeName) {
        this.storeName = storeName;
    }

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName);
    }

    @Override
    public KeyValue<K, Pair<V, V>> transform(K key, V value) {
        // 1. Update the store to remember the last message seen. 
        if (stateStore.get(1) == null) {
            stateStore.put(1, value); return null;
        }
        KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value));
        stateStore.put(1, null);
        return result;
    }

    @Override
    public void close() { }

}


public KStream<String, String> sampleStream(StreamsBuilder builder) {
    KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
    // 2. Create the state store and register it with the streams builder. 
    KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName);
    StoreBuilder storeBuilder = new KeyValueStoreBuilder<>(
            store,
            new Serdes.IntegerSerde(),
            new Serdes.StringSerde(),
            Time.SYSTEM
    );
    builder.addStateStore(storeBuilder);
    transformToPairs(messages);
    return messages;
}

private void transformToPairs(KStream<String, String> messages) {
    // 3. reference the name of the state store when calling transform(...)
    KStream<String, Pair<String, String>> pairs = messages.transform(
            new PairTransformerSupplier<>(),
            stateStoreName
    );
    KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null);
    KStream<String, String> serialized = filtered.mapValues(Pair::toString);
    serialized.to(outputTopic);
}

Changes to the state store can be watched using the console consumer:

./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092

Full source code here: https://github.com/1123/spring-kafka-stream-with-state-store

Original Answer:

The JavaDoc of the org.apache.kafka.streams.kstream.ValueMapper interface states that it is for stateless record-by-record transformations, and that the org.apache.kafka.streams.kstream.Transformer interface, on the other hand, is

for stateful mapping of an input record to zero, one, or multiple new output records.

Therefore I guess the Transformer interface is the appropriate choice for collecting pairs of messages. This may only be of relevance in case of failure and restart of streaming applications, such that they can recover the state from Kafka.

Hence, here is another solution based upon the org.apache.kafka.streams.kstream.Transformer interface:

class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {

    @Override
    public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
        return new PairTransformer<>();
    }
}

public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
    private V left;

    @Override
    public void init(ProcessorContext context) {
        left = null;
    }

    @Override
    public KeyValue<K, Pair<V, V>> transform(K key, V value) {
        if (left == null) { left = value; return null; }
        KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value));
        left = null;
        return result;
    }

    @Override
    public KeyValue<K, Pair<V, V>> punctuate(long timestamp) {
        return null;
    }

    public void close() { }

}

The PairTransformerSupplier is then used as follows:

private void accumulateTwo(KStream<String, String> messages) {
    messages.transform(new PairTransformerSupplier<>())
            .filter((key, value) -> value != null)
            .mapValues(Pair::toString)
            .to("pairs");
}

Trying out both solutions within a single process on a topic with a single partition yields, however, the exact same results. I have not tried with a topic with multiple partitions and multiple stream consumers.

Upvotes: 4

daniu
daniu

Reputation: 14999

You should be able to write an accumulator class

class Accumulator implements ValueMapper<String, Optional<Tuple<String>>> {
    private String key;

    public Optional<Tuple<String>> get(String item) {
        if (key == null) {
            key = item;
            return Optional.empty();
        }
        Optional<Tuple<String>> result = Optional.of(new Tuple<>(key, item));
        key = null;
        return result;
    }
}

and then process with

messages.mapValues(new Accumulator())
        .filter(Optional::isPresent) // I don't think your filter is correct
        .to("pairs");

Upvotes: 1

Related Questions