Reputation: 91
I am new to Spring Cloud Stream and trying to produce Avro messages based on the Confluent Schema Registry.
I can get the very basic example, https://cloud.spring.io/spring-cloud-stream/ working, but when I try to extend it further to use Avro, I am getting exceptions.
As a generic test environment, I am running the Docker image: https://github.com/Landoop/fast-data-dev/
The test project was generated and slightly modified using Spring Initializr: https://start.spring.io/
The pom.xml:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.demo.kafka</groupId>
<artifactId>kafka-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<name>kafka-demo</name>
<description>Demo project for Spring Boot</description>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.4.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<spring-cloud.version>Dalston.SR1</spring-cloud.version>
<project-avro.version>7.0.0-SNAPSHOT</project-avro.version>
<apache.avro.version>1.8.2</apache.avro.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>1.2.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.9.9</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>${apache.avro.version}</version>
</dependency>
<dependency>
<groupId>project.foundation</groupId>
<artifactId>maritz-avro</artifactId>
<version>${project-avro.version}</version>
</dependency>
</dependencies>
</project>
I have published the following schema to the registry:
{
"type": "record",
"name": "RoutingSlipMsg",
"fields": [
{
"name": "projectNumber",
"type": "string"
},
{
"name": "clientName",
"type": "string"
},
{
"name": "programTheme",
"type": "string"
},
{
"name": "databaseName",
"type": [
"null",
"string"
]
},
{
"name": "actionType",
"type": [
"null",
"string"
]
}
]
}
Original Test application.yml
spring:
cloud:
stream:
schema:
avro:
dynamicSchemaGenerationEnabled: true
bindings:
output:
binder: kafka
destination: routing_slip_dev
# contentType: application/avro
# contentType: application/*+avro
contentType: application/com.project.RoutingSlipMsg.v1+avro
schemaRegistryClient:
endpoint: http://192.168.99.100:8081
kafka:
binder:
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: 192.168.99.100
brokers: 192.168.99.100
Test application.yml edits:
spring:
cloud:
stream:
schema:
avro:
dynamicSchemaGenerationEnabled: true
bindings:
output:
binder: kafka
destination: routing_slip_dev
# contentType: application/*+avro
# contentType: application/avro
schemaRegistryClient:
endpoint: http://192.168.99.100:8081
kafka:
binder:
defaultBrokerPort: 9092
defaultZkPort: 2181
zkNodes: 192.168.99.100
brokers: 192.168.99.100
Configuration class
@Configuration
public class DemoConfiguration {
@Bean
public ConfluentSchemaRegistryClient schemaRegistryClient() {
ConfluentSchemaRegistryClient registry = new ConfluentSchemaRegistryClient();
registry.setEndpoint("http://192.168.99.100:8081");
return registry;
}
}
The auto topic creation seems to be working properly. I can change the destination: value to something else each time and a new topic will be created.
Test
package com.project.kafka.kafkademo;
import com.project.avro.RoutingSlipMsg;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaDemoApplicationTests {
@Autowired
private KafkaRoutingSlipSender kafkaRoutingSlipSender;
@Test
public void testSend() throws Exception {
RoutingSlipMsg routingSlip = new RoutingSlipMsg();
routingSlip.setActionType("FAKE_ACTION");
routingSlip.setClientName("TEST_CLIENT");
routingSlip.setDatabaseName("NONE");
routingSlip.setProgramTheme("THEME");
routingSlip.setProjectNumber("ABC123");
kafkaRoutingSlipSender.send(routingSlip);
}
}
**BootApplication class*
package com.project.kafka.kafkademo;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class KafkaDemoApplication {
public static void main(String[] args) {
SpringApplication.run(KafkaDemoApplication.class, args);
}
}
Original Sender class
package com.project.kafka.kafkademo;
import com.project.avro.RoutingSlipMsg;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {
@InboundChannelAdapter(value = Source.OUTPUT)
public MessageSource<RoutingSlipMsg> send(RoutingSlipMsg data) {
System.out.println("**** Hello World ****");
return () -> {
return new GenericMessage<>(data);
};
}
}
Sender Class Edits:
@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {
@Autowired
private Source source;
public void send(RoutingSlipMsg data) {
System.out.println("**** Hello World ****");
source.output().send(MessageBuilder.withPayload(data).build());
}
}
Original Stacktrace / Exception due to inappropriate use of @InboundChannelAdapter
2017-07-06 12:20:48.645 ERROR 69011 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is java.lang.IllegalArgumentException: wrong number of arguments
at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:119)
at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:201)
at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:116)
... 20 more
I have reviewed several different posts on here, but nothing seems to be helping. What might I be missing?
Update 1:
I have made updates to the KafkaRoutingSlipSender
class and the application.yml
as shown above. With the contentType
removed, these changes will publish a message like:
ÿcontentType?"application/x-java-object;type=com.maritz.avro.RoutingSlipMsg"FAKE_ACTIOÎTEST_CLIENÔNONÅTHEMÅABC12³
to the topic.
However, when I set the contentType
to application/*+avro
, the @Autowired Source
appears to be casuing a NullPointerException
.
New Stacktrace / Exception:
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=9a9ebc45-9431-8de5-1cb0-40d191082599, timestamp=1499438830456}]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:22)
at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:57)
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
... 31 more
Update 2 (07-10-2017):
I have fixed the null
Source issue, thanks to this information:
Spring Boot: Configuration Class is simply ignored and not loaded
The ConfluentSchemaRegistryClient
is now being used. I can step into the code when debugging, but it doesn't find the schema.
ConfluentSchemaRegistryClient.register(String subject, String format, String schema)
ResponseEntity<Map> response = this.template.exchange(this.endpoint + path, HttpMethod.POST, request, Map.class, new Object[0]);
New Stacktrace / Exception (07-10-2017):
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is org.springframework.web.client.HttpClientErrorException: 404 Not Found
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=73365130-a91b-ba9c-aaf0-77cfaa26f73d, timestamp=1499700409997}]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:29)
at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:52)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.springframework.web.client.HttpClientErrorException: 404 Not Found
at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:63)
at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:700)
at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:653)
at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613)
at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:531)
at org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient.register(ConfluentSchemaRegistryClient.java:73)
at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
... 31 more
Is the register()
supposed to be creating a new schema if that one doesn't exist, or is the 404 appropriate behavior? Quite confused.
Upvotes: 3
Views: 4247
Reputation: 4156
We are working on the docs, should be ready soon. In the meantime just expose this on your application:
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
Upvotes: 1
Reputation: 4156
So what you see as application/x-java-object;type=com.maritz.avro.RoutingSlipMsg
is because the default contentType for java objects is to serialize using Kryo, and that's what it set as contentType.
Since you want to use Confluent Schema Registry, that is not the default one, you need to setup a ConfluentSchemaRegistryClient bean, check it here how to enable it: http://docs.spring.io/spring-cloud-stream/docs/Ditmars.BUILD-SNAPSHOT/reference/htmlsingle/#_schema_registry_client
You now have a DefaultSchemaRegistryClient and it's probably having issues since the REST interface between the default server and confluent are not the same.
Upvotes: 3
Reputation: 174769
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
An @InboundChannelAdapter
method cannot have any parameters...
/**
* Indicates that a method is capable of producing a {@link org.springframework.messaging.Message}
* or {@link org.springframework.messaging.Message} {@code payload}.
* <p>
* A method annotated with {@code @InboundChannelAdapter} can't accept any parameters.
* <p>
* Return values from the annotated method may be of any type. If the return
* value is not a {@link org.springframework.messaging.Message}, a {@link org.springframework.messaging.Message}
* will be created with that object as its {@code payload}.
...
It is polled for a message, or message payload.
Upvotes: 3