Reputation: 394
I'm trying to consume confluent avro message from kafka topic as Kstream with spring boot 2.0.
I was able to consume the message as MessageChannel
but not as KStream
.
@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
Exception:
Exception in thread "pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3" org.apache.kafka.streams.errors.StreamsException: stream-thread [pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3] Failed to rebalance. at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860) at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) Caused by: org.apache.kafka.streams.errors.StreamsException: Failed to configure value serde class io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:859) at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.(AbstractProcessorContext.java:59) at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.(ProcessorContextImpl.java:42) at org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:134) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) at org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) at org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) ... 3 more Caused by: io.confluent.common.config.ConfigException: Missing required configuration "schema.registry.url" which has no default value. at io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) at io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) at io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) at io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) at io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) at io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107) at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:855) ... 19 more
Based on the error I think I'm missing to configure the schema.registry.url
for confluent.
I had a quick look at the sample here
Kind of bit lost on how to do the same with spring cloud stream using the streamListener
Does this need to be a separate configuration? or Is there a way to configure schema.registry.url
in application.yml
itself that confluent is looking for?
here is the code repo https://github.com/naveenpop/springboot-kstream-confluent
Organization.avsc
{
"namespace":"com.test.demo.avro",
"type":"record",
"name":"Organization",
"fields":[
{
"name":"orgId",
"type":"string",
"default":"null"
},
{
"name":"orgName",
"type":"string",
"default":"null"
},
{
"name":"orgType",
"type":"string",
"default":"null"
},
{
"name":"parentOrgId",
"type":"string",
"default":"null"
}
]
}
DemokstreamApplication.java
@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemokstreamApplication.class, args);
}
@Component
public static class organizationProducer implements ApplicationRunner {
@Autowired
private KafkaProducer kafkaProducer;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Starting: Run method");
List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = names.get(new Random().nextInt(names.size()));
try {
this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
} catch (Exception e) {
log.info("Exception :" +e);
}
};
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
}
}
}
KafkaConfig.java
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
KafkaConsumer.java
@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
}
}
KafkaProducer.java
@EnableBinding(KstreamBinding.class)
public class KafkaProducer {
@Autowired
private KstreamBinding kstreamBinding;
public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {
try {
Organization organization = Organization.newBuilder()
.setOrgId(orgId)
.setOrgName(orgName)
.setOrgType(orgType)
.setParentOrgId(parentOrgId)
.build();
kstreamBinding.organizationOutputMessageChannel()
.send(MessageBuilder.withPayload(organization)
.setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
.build());
} catch (Exception e){
log.error("Failed to produce Organization Message:" +e);
}
}
}
KstreamBinding.java
public interface KstreamBinding {
String ORGANIZATION_INPUT= "organizationInput";
String ORGANIZATION_OUTPUT= "organizationOutput";
@Input(ORGANIZATION_INPUT)
KStream<String, Organization> organizationInputMessageChannel();
@Output(ORGANIZATION_OUTPUT)
MessageChannel organizationOutputMessageChannel();
}
Update 1:
I applied the suggestion from dturanski here and the error vanished. However still not able to consume the message as KStream<String, Organization>
no error in the console.
Update 2:
Applied the suggestion from sobychacko here and the message is consumable with empty values in the object.
I've made a commit to the GitHub sample to produce the message from spring boot itself and still getting it as empty values.
Thanks for your time on this issue.
Upvotes: 1
Views: 1455
Reputation: 5904
The following implementation will not do what you are intending:
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
That log statement is only invoked once at the bootstrap phase. In order for this to work, you need to invoke some operations on the received KStream
and then provide the logic there. For e.g. following works where I am providing a lambda expression on the foreach
method call.
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("Organization Received:" + organization1));
}
You also have an issue in the configuration where you are wrongly assigning avro Serde
for keys where it is actually a String
. Change it like this:
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
With these changes, I get the logging statement each time I send something to the topic. However, there is a problem in your sending groovy script, I am not getting any actual data from your Organization
domain, but I will let you figure that out.
Organization
domain objectThis happens because you have a mixed mode of serialization strategies going on. You are using Spring Cloud Stream's avro message converters on the producer side but on the Kafka Streams processor, using the Confluent avro Serdes. I just tried with the Confluent's serializers all the way from producers to processor and I was able to see the Organization
domain on the outbound. Here is the modified configuration to make the serialization consistent.
spring:
application:
name: kstream
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:8081
schema:
avro:
schema-locations: classpath:avro/Organization.avsc
bindings:
organizationInput:
destination: organization-updates
group: demokstream.org
consumer:
useNativeDecoding: true
organizationOutput:
destination: organization-updates
producer:
useNativeEncoding: true
kafka:
bindings:
organizationOutput:
producer:
configuration:
key.serializer: org.apache.kafka.common.serialization.StringSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
streams:
binder:
brokers: localhost
configuration:
schema.registry.url: http://localhost:8081
commit:
interval:
ms: 1000
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$StringSerde
value:
serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
You can also remove the KafkaConfig
class as wells as the EnableSchemaRegistryClient
annotation from the main application class.
Upvotes: 3
Reputation: 1723
Try spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url: ...
Upvotes: 1