Reputation: 3242
This is a beginner's question to kafka-streaming.
How would you collect pairs of messages using the java kafka-streaming library and write them to a new output topic?
I was thinking about something like this:
private void accumulateTwo(KStream<String, String> messages) {
Optional<String> accumulator = Optional.empty();
messages.mapValues(value -> {
if (accumulator.isPresent()) {
String tmp = accumulator.get();
accumulator = Optional.empty();
return Optional.of(new Tuple<>(tmp, value));
}
else {
accumulator = Optional.of(value);
return Optional.empty();
}
}).filter((key, value) -> value.isPresent()).to("pairs");
Yet this will not work, since variables in Java Lambda expressions must be final.
Any ideas?
Upvotes: 2
Views: 2591
Reputation: 3242
As suggested in the comments, three additional steps are necessary:
Transformer
must explicitly store its state within a state store. It will get a reference to the state store from the ProcessorContext
, which it is getting passed in the init
method. StreamsBuilder
transform
method.In this example it is sufficient to store the last message we have seen. We are using a KeyValueStore
for this which will have exactly zero or one entry at each point in time.
public class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
private String storeName;
public PairTransformerSupplier(String storeName) {
this.storeName = storeName;
}
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>(storeName);
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private ProcessorContext context;
private String storeName;
private KeyValueStore<Integer, V> stateStore;
public PairTransformer(String storeName) {
this.storeName = storeName;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore<Integer, V>) context.getStateStore(storeName);
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
// 1. Update the store to remember the last message seen.
if (stateStore.get(1) == null) {
stateStore.put(1, value); return null;
}
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(stateStore.get(1), value));
stateStore.put(1, null);
return result;
}
@Override
public void close() { }
}
public KStream<String, String> sampleStream(StreamsBuilder builder) {
KStream<String, String> messages = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// 2. Create the state store and register it with the streams builder.
KeyValueBytesStoreSupplier store = Stores.persistentKeyValueStore(stateStoreName);
StoreBuilder storeBuilder = new KeyValueStoreBuilder<>(
store,
new Serdes.IntegerSerde(),
new Serdes.StringSerde(),
Time.SYSTEM
);
builder.addStateStore(storeBuilder);
transformToPairs(messages);
return messages;
}
private void transformToPairs(KStream<String, String> messages) {
// 3. reference the name of the state store when calling transform(...)
KStream<String, Pair<String, String>> pairs = messages.transform(
new PairTransformerSupplier<>(),
stateStoreName
);
KStream<String, Pair<String, String>> filtered = pairs.filter((key, value) -> value != null);
KStream<String, String> serialized = filtered.mapValues(Pair::toString);
serialized.to(outputTopic);
}
Changes to the state store can be watched using the console consumer:
./bin/kafka-console-consumer --topic <changelog-topic-name> --bootstrap-server localhost:9092
Full source code here: https://github.com/1123/spring-kafka-stream-with-state-store
The JavaDoc of the org.apache.kafka.streams.kstream.ValueMapper
interface states that it is for stateless record-by-record transformations, and that the org.apache.kafka.streams.kstream.Transformer
interface, on the other hand, is
for stateful mapping of an input record to zero, one, or multiple new output records.
Therefore I guess the Transformer
interface is the appropriate choice for collecting pairs of messages. This may only be of relevance in case of failure and restart of streaming applications, such that they can recover the state from Kafka.
Hence, here is another solution based upon the org.apache.kafka.streams.kstream.Transformer
interface:
class PairTransformerSupplier<K,V> implements TransformerSupplier<K, V, KeyValue<K, Pair<V,V>>> {
@Override
public Transformer<K, V, KeyValue<K, Pair<V, V>>> get() {
return new PairTransformer<>();
}
}
public class PairTransformer<K,V> implements Transformer<K, V, KeyValue<K, Pair<V, V>>> {
private V left;
@Override
public void init(ProcessorContext context) {
left = null;
}
@Override
public KeyValue<K, Pair<V, V>> transform(K key, V value) {
if (left == null) { left = value; return null; }
KeyValue<K, Pair<V,V>> result = KeyValue.pair(key, new Pair<>(left, value));
left = null;
return result;
}
@Override
public KeyValue<K, Pair<V, V>> punctuate(long timestamp) {
return null;
}
public void close() { }
}
The PairTransformerSupplier is then used as follows:
private void accumulateTwo(KStream<String, String> messages) {
messages.transform(new PairTransformerSupplier<>())
.filter((key, value) -> value != null)
.mapValues(Pair::toString)
.to("pairs");
}
Trying out both solutions within a single process on a topic with a single partition yields, however, the exact same results. I have not tried with a topic with multiple partitions and multiple stream consumers.
Upvotes: 4
Reputation: 14999
You should be able to write an accumulator class
class Accumulator implements ValueMapper<String, Optional<Tuple<String>>> {
private String key;
public Optional<Tuple<String>> get(String item) {
if (key == null) {
key = item;
return Optional.empty();
}
Optional<Tuple<String>> result = Optional.of(new Tuple<>(key, item));
key = null;
return result;
}
}
and then process with
messages.mapValues(new Accumulator())
.filter(Optional::isPresent) // I don't think your filter is correct
.to("pairs");
Upvotes: 1