Lauren A.
Lauren A.

Reputation: 11

Kafka Streams Internal Topics Redirection

Currently working with Kafka Streams to aggregate events in a client's system. When running our prototype with fake events, everything works perfectly. However, when using actual data, we noticed that in the process of aggregation, Streams automatically creates internal topics. While in theory this is fine, our client has necessary, super tight security and is unwilling to grant my development team topic creating privileges. This means we cannot run our Streams program as-is.

We can, though, have topics created for us and use those instead of Streams creating its own Kafka topics. Is it possible/how would one start to go about redirecting Streams internal topic creation to leverage existing topics?

Note: We can name the internal topics whatever we want. It just has to be created by the team that has those privileges.

Upvotes: 0

Views: 604

Answers (1)

Ran Lupovich
Ran Lupovich

Reputation: 1841

In Kafka Streams, there are now overloaded methods for both KStream and KTable that accept a new parameter Named. By using the Named class DSL, users can provide meaningful names to the processors in their topology.

KStream<String,String> stream =
builder.stream("input", Consumed.as("Customer_transactions_input_topic"));
stream.filter((k,v) -> !v.equals("invalid_txn"), Named.as("filter_out_invalid_txns"))
      .mapValues((v) -> v.substring(0,5), Named.as("Map_values_to_first_6_characters"))
      .to("output", Produced.as("Mapped_transactions_output_topic"));
Topologies:
  Sub-topology: 0
   Source: Customer_transactions_input_topic (topics: [input])
     --> filter_out_invalid_txns
   Processor: filter_out_invalid_txns (stores: [])
     --> Map_values_to_first_6_characters
     <-- Customer_transactions_input_topic
   Processor: Map_values_to_first_6_characters (stores: [])
     --> Mapped_transactions_output_topic
     <-- filter_out_invalid_txns
   Sink: Mapped_transactions_output_topic (topic: output)
     <-- Map_values_to_first_6_characters

Now, take a look at your topology with all the processors named:

Now you can look at the topology description and easily understand what role each processor plays in the topology. But there’s another reason for naming your processor nodes when you have stateful operators that remain between restarts of your Kafka Streams applications, state stores, changelog topics, and repartition topics, which has to do with potential name shifting of the processor nodes that use generated names.

Upvotes: 0

Related Questions