Reputation: 327
Need some help in integrating kafka with spring cloud stream. The application is very simple, with 2 parts(run as separate java processes)
I have created RequestSenderChannel and ResponseReceiverChannel interfaces for consumer and RequestReceiverChannel and ResponseSenderChannel for the producer application. both of them share the same yaml file. As per the documentation spring.cloud.stream.bindings..destination should specify the topic to which the message is sent or received. But when i run the application, the application creates topics as 'RequestSender', 'RequestReceiver', 'ResponseSender' and 'ResponseReceiver' in the kafka
My assumption was: since destination in the YAML file specifies only two topics 'RequestTopic' and 'ResponseTopic', it should have created those topics. but it creates Kafka topics for attributes specified at 'spring.cloud.stream.bindings' in the YAML file. can someone please point out the issue in the configruation/code?
public interface RequestReceiverChannel
{
String requestReceiver ="RequestReceiver";
@Input(requestReceiver)
SubscribableChannel pathQueryRequest();
}
public interface RequestSenderChannel
{
String RequestSender ="RequestSender";
@Output(RequestSender)
MessageChannel pathQueryRequestSender();
}
public interface ResponseReceiverChannel
{
String ResponseReceiver = "ResponseReceiver";
@Input(ResponseReceiver)
SubscribableChannel pceResponseServiceReceiver();
}
public interface ResponseSenderChannel
{
String ResponseSender = "ResponseSender";
@Output(ResponseSender)
MessageChannel pceResponseService();
}
'''
The YAML configuration file
spring:
cloud:
stream:
defaultBinder: kafka
bindings:
RequestSender:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseSender:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
RequestReceiver:
binder: kafka
destination: RequestTopic
content-type: application/protobuf
group: consumergroup
ResponseReceiver:
binder: kafka
destination: ResponseTopic
content-type: application/protobuf
group: consumergroup
kafka:
bindings:
RequestTopic:
consumer:
autoCommitOffset: false
ResponseTopic:
consumer:
autoCommitOffset: false
binder:
brokers: ${SERVICE_KAFKA_HOST:localhost}
zkNodes: ${SERVICE_ZOOKEEPER_HOST:127.0.0.1}
defaultZkPort: ${SERVICE_ZOOKEEPER_PORT:2181}
defaultBrokerPort: ${SERVICE_KAFKA_PORT:9092}
Upvotes: 0
Views: 1226
Reputation: 6126
By doing spring.cloud.stream.bindings.<binding-name>.destination=foo
you are expressing desire to map binding specified by <binding-name>
(e.g., RequestSender
) to a broker destination named foo
. If such destination does not exist it will be auto-provisioned.
So there are no issues.
That said, we've just released Horsham.RELEASE (part of cloud Hoxton.RELEASE) and we are moving away from annotation-based model you are currently using in favor of a significantly simpler functional model. You can read more about it in our release blog which also provides links to 4 posts where we elaborate and provide more examples on functional programming paradigm.
Upvotes: 2