Thiru
Thiru

Reputation: 327

Spring cloud stream with kafka

Need some help in integrating kafka with spring cloud stream. The application is very simple, with 2 parts(run as separate java processes)

  1. A consumer- puts request into RequestTopic and gets response from ResponseTopic
  2. A producer- gets the request from the RequestTopic and puts the response back in ResponseTopic.

I have created RequestSenderChannel and ResponseReceiverChannel interfaces for consumer and RequestReceiverChannel and ResponseSenderChannel for the producer application. both of them share the same yaml file. As per the documentation spring.cloud.stream.bindings..destination should specify the topic to which the message is sent or received. But when i run the application, the application creates topics as 'RequestSender', 'RequestReceiver', 'ResponseSender' and 'ResponseReceiver' in the kafka

My assumption was: since destination in the YAML file specifies only two topics 'RequestTopic' and 'ResponseTopic', it should have created those topics. but it creates Kafka topics for attributes specified at 'spring.cloud.stream.bindings' in the YAML file. can someone please point out the issue in the configruation/code?

public interface RequestReceiverChannel
{
    String requestReceiver ="RequestReceiver";
    @Input(requestReceiver)
    SubscribableChannel pathQueryRequest();
}

public interface RequestSenderChannel
{
    String RequestSender ="RequestSender";
    @Output(RequestSender)
    MessageChannel pathQueryRequestSender();
}

public interface ResponseReceiverChannel
{
    String ResponseReceiver = "ResponseReceiver";
    @Input(ResponseReceiver)
    SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
    String ResponseSender = "ResponseSender";
    @Output(ResponseSender)
    MessageChannel pceResponseService();
}
'''

The YAML configuration file

spring:
  cloud:
    stream:
      defaultBinder: kafka
      bindings:
        RequestSender:
          binder: kafka
          destination: RequestTopic
          content-type: application/protobuf
          group: consumergroup
        ResponseSender:
          binder: kafka
          destination: ResponseTopic
          content-type: application/protobuf
          group: consumergroup
        RequestReceiver:
          binder: kafka
          destination: RequestTopic
          content-type: application/protobuf
          group: consumergroup
        ResponseReceiver:
          binder: kafka
          destination: ResponseTopic
          content-type: application/protobuf
          group: consumergroup
      kafka:
        bindings:
          RequestTopic:
            consumer:
              autoCommitOffset: false
          ResponseTopic:
            consumer:
              autoCommitOffset: false
        binder:
          brokers: ${SERVICE_KAFKA_HOST:localhost}
          zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
          defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
          defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}

Upvotes: 0

Views: 1226

Answers (1)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6126

By doing spring.cloud.stream.bindings.<binding-name>.destination=foo you are expressing desire to map binding specified by <binding-name> (e.g., RequestSender) to a broker destination named foo. If such destination does not exist it will be auto-provisioned. So there are no issues.

That said, we've just released Horsham.RELEASE (part of cloud Hoxton.RELEASE) and we are moving away from annotation-based model you are currently using in favor of a significantly simpler functional model. You can read more about it in our release blog which also provides links to 4 posts where we elaborate and provide more examples on functional programming paradigm.

Upvotes: 2

Related Questions