Reputation: 23145
I am trying to build a custom state store which stores key to map of values.
Stream & Store configuration
final Serde<HashMap<String, ?>> userSessionsSerde = Serdes.serdeFrom(new HashMapSerializer(), new HashMapDeserializer()); StoreBuilder sessionStoreBuilder = Stores.keyValueStoreBuilder(Stores.persistentKeyValueStore(storeName), Serdes.String(), userSessionsSerde); builder.addStateStore(sessionStoreBuilder); builder.stream("connection-events", Consumed.with(Serdes.String(), wsSerde)) .transform(wsEventTransformerSupplier, storeName) .to("status-changes", Produced.with(Serdes.String(), Serdes.String())); KafkaStreams streams = new KafkaStreams(builder.build(), properties); streams.start();
Transformer
public class WSEventProcessor implements Transformer<String, ConnectionEvent, KeyValue<String, String>> {
private String storeName = "user-sessions";
private KeyValueStore<String, Map<String, ConnectionEvent>> stateStore;
final Serde<HashMap<String, ?>> userSessionsSerde = Serdes.serdeFrom(new HashMapSerializer(), new HashMapDeserializer());
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.context = context;
stateStore = (KeyValueStore<String, Map<String, ConnectionEvent>>) context.getStateStore(storeName);
}
@Override
public void close() {
}
@Override
public KeyValue<String, String> transform(String key, ConnectionEvent value) {
boolean sendUpdate = false;
//Send null if there are no updates to be sent to downstream processors
if(value.getState() == WebSocketConnection.CONNECTED) {
if(stateStore.get(key) == null) {
stateStore.put(key, new HashMap<>());
sendUpdate = true;
}
stateStore.get(key).put(value.getSessionId(), value);
return sendUpdate ? KeyValue.pair(key, "Online") : null;
}
else {
stateStore.get(key).remove(value.getSessionId());
int size = stateStore.get(key).size();
return stateStore.get(key).isEmpty() ? KeyValue.pair(key, "Offline") : null;
}
}
}
The state store always has 0 size map for each key irrespective of connected and disconnected events. Am I doing something wrong?
Upvotes: 0
Views: 1256
Reputation: 3955
value object that you stored into stateStore.put(key, value)
and stateStore.get(key)
are different objects (as it serialized and then deserialized).
Your issue is related to modification of object returned from state store:
stateStore.get(key).put(value.getSessionId(), value)
and stateStore.get(key).remove(value.getSessionId())
. when you update object stateStore.get(key)
, it's actually not persisted to state store, only changes that object.
So, to fix your issue, calculate required value (in your case HashMap
), and only after that apply stateStore.put(key, calculated_value)
. If you need to remove key-value from state store, use stateStore.put(key, null)
. Your transform
method should look approximately like:
public KeyValue<String, String> transform(String key, ConnectionEvent value) {
Map<String, Object> valueFromStateStore = stateStore.get(key);
Map<String, Object> valueToUpdate = ofNullable(valueFromStateStore).orElseGet(Collections::emptyMap);
KeyValue<String, String> resultKeyValue = null;
//Send null if there are no updates to be sent to downstream processors
if(value.getState() == WebSocketConnection.CONNECTED) {
if(valueToUpdate.isEmpty()) {
resultKeyValue = KeyValue.pair(key, "Online");
}
valueToUpdate.put(value.getSessionId(), value);
}
else {
valueToUpdate.remove(value.getSessionId());
if (valueToUpdate.isEmpty()) {
resultKeyValue = KeyValue.pair(key, "Offline");
}
}
stateStore.put(key, valueToUpdate);
return resultKeyValue;
}
Upvotes: 0