Dusan.czh
Dusan.czh

Reputation: 757

Building a cache with Kafka Streams

I am trying to get a sense of what is possible and how to think when working with Kafka Streams.

The Use-Case:

There is a topic called Transactions:

I want to create a cache that will hold all recent transactions (last 10 minutes).

The cache can be queried by a rest client by providing the transaction reference.

Questions:

  1. Is Kafka streams (along with it's Materialized views) a good fit to implement such a cache?
  2. If yes how would you go about it? Remember it needs to keep only the last 10 minutes of transactions and discard the older ones.
  3. If not, why not?

Upvotes: 1

Views: 1622

Answers (1)

Paweł Szymczyk
Paweł Szymczyk

Reputation: 1792

Yes, it's very good idea to develop it in kafka-streams. How to do it?

  1. First, create class that represents values of the cache:
class Transaction {
 Instant createTime;
 Status status;
 String transactionReference;
}
  1. Second, create class that handles cache logic - implements org.apache.kafka.streams.kstream.Transformer<K, V, R>:
public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {

    private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);

    private KeyValueStore<String, Transaction> transactions;

    @Override
    public void init(ProcessorContext context) {
        this.transactions = context.getStateStore("transactions-store");
        context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
            timestamp -> transactions.all()
                .forEachRemaining(kV -> {
                    if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
                        transactions.delete(kV.key);
                    }
                }));
    }

    private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
        return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
    }

    @Override
    public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
        Transaction t = this.transactions.get(transaction.getTransactionReference());
        if (t == null) {
            transactions.put(transaction.getTransactionReference(), transaction);
        }
        return null;
    }

    @Override
    public void close() {

    }
}
  1. Then, register transformer in topology:
    static StreamsBuilder buildKafkaStreamsTopology() {
        StreamsBuilder builder = new StreamsBuilder();
        
        StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
            .keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
        builder.addStateStore(transferProcessKeyValueStore);

        builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
            .transform(TransactionsCache::new, "transactions-store");

        return builder;
    }
  1. Next step is reading the data in http controller:
@RestController
public class TransactionsController {

    private final KafkaStreams kafkaStreams;

    public TransactionsController(KafkaStreams kafkaStreams) {
        this.kafkaStreams = kafkaStreams;
    }

    @GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
    Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
        ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
            StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));

        return store.get(transactionReference);
    }
}

  1. Last thing. Remember that this in memory cache is partitioned by default so in case of running many instances of your application you need to add some RPC method to get data from another instance in case of miss (Kafka Interactive Queries), here you have some very neat example. Or second solution is to use org.apache.kafka.streams.kstream.GlobalKTable<K, V>

Upvotes: 2

Related Questions