Cortlendt
Cortlendt

Reputation: 2240

Kafka Stream is not populated

I have a very simple Java/Spring application to demonstrate KStream functionality, but unfortunately I cannot make KStream to load data. Idea is to create a KStream object and simply retrieve its content using controller GET method. Sample code:

@RestController
@RequestMapping("/resources/")
public class StreamController {

   private KafkaStreams streams;
   private KStream<String, ResourceMessage> resourceStream;

   StreamController() {

      // configure streams/consumer
      Properties props = new Properties();

      // make sure stream starts from the beginning
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, UUID.randomUUID().toString());
      props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
      props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
      props.put(StreamsConfig.STATE_DIR_CONFIG, Path.of(System.getProperty("java.io.tmpdir")).toAbsolutePath().toString());

      //create POJO serdes
      StreamsBuilder builder = new StreamsBuilder();
      Map<String, Object> serdeProps = new HashMap<>();
      Serializer<ResourceMessage> resourceSerializer = new JsonPOJOSerializer<>();
      serdeProps.put("JsonPOJOClass", ResourceMessage.class);
      resourceSerializer.configure(serdeProps, false);
      Deserializer<ResourceMessage> resourceDeserializer = new JsonPOJODeserializer<>();
      serdeProps.put("JsonPOJOClass", Resource.class);
      resourceDeserializer.configure(serdeProps, false);
      Serde<ResourceMessage> resourceSerde = Serdes.serdeFrom(resourceSerializer, resourceDeserializer);

      // create KStream with POJO serdes for value
      resourceStream = builder.stream("Resources", Consumed.with(Serdes.String(), resourceSerde));
      streams = new KafkaStreams(builder.build(), props);
      streams.start();
     }

   // GET method that enumerates KStream and returns contents
   @GetMapping(value = "/resource")
   public List<Resource> getResources() {

       List<ResourceMessage> messages = new LinkedList<ResourceMessage>();

       // problem is here - there are messages in the topic but KStream returns no values in foreach(...)
       resourceStream.foreach((key, value) -> messages.add(value));

       return messages.stream().map(m -> Resource.builder()
            .id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
   }
}

Problem - there are messages in the topic, but KStream enumeration in foreach(...) retrieves no results from it. KStream object state is "RUNNING" and there are no errors in the logs.

Generating random APPLICATION_ID and setting AUTO_OFFSET_RESET to "earliest" does not help. I can clearly see some messages in the topic using Kafka Tool. Adding new messages while controller is running also does not help. Is there anything I'm missing or do not understand about Kafka streaming?

PS I'm using POJO serializer and deserializer examples from here.

Upvotes: 0

Views: 634

Answers (1)

Ismail
Ismail

Reputation: 2992

Kafka Streams is a Kafka client used for real-time stream processing. In your case, you don't need a Kafka Streams client (it won't work), you need a simple Kafka Consumer that polls records from Kafka and send it back using the Rest API. for example:

@RestController
@RequestMapping("/resources/")
public class StreamController {

   private KafkaStreams streams;
   private Consumer<String, ResourceMessage> consumer;

   StreamController() {

  // configure consumer properties
  Properties props = new Properties();

  // make the right properties with your Serialize and deserialiser

  props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

  // Create the consumer using props.
  consumer = new KafkaConsumer<>(props);
  consumer.subscribe(Collections.singletonList(TOPIC));
 }

   // GET method that enumerates KStream and returns contents
   @GetMapping(value = "/resource")
   public List<Resource> getResources() {

   List<ResourceMessage> messages = new LinkedList<ResourceMessage>();

   ConsumerRecords<String, ResourceMessage> consumerRecords =
                consumer.poll(1000);
   
  
   messages = consumerRecords... // Convert the records to your custom POJO

   return messages.stream().map(m -> Resource.builder()
        .id(m.getResouceId()).resource(m.getResource()).build()).collect(Collectors.toList());
   }
}

You can find here a full example link.

UPDATE

Also, you should know that a RestControlled is request scoped, so there is an instance of your controller created for each request. So finally you will end up with nothing for the API response. If you want to use Kafka Streams, you can launch it in your main method, while you have a Spring boot application. You can see this for example, link.

Upvotes: 1

Related Questions