dholtz
dholtz

Reputation: 91

Using Spring Cloud Stream to produce Avro Messages using the Confluent Schema Registry

I am new to Spring Cloud Stream and trying to produce Avro messages based on the Confluent Schema Registry.

I can get the very basic example, https://cloud.spring.io/spring-cloud-stream/ working, but when I try to extend it further to use Avro, I am getting exceptions.

As a generic test environment, I am running the Docker image: https://github.com/Landoop/fast-data-dev/

The test project was generated and slightly modified using Spring Initializr: https://start.spring.io/

The pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.demo.kafka</groupId>
  <artifactId>kafka-demo</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>kafka-demo</name>
  <description>Demo project for Spring Boot</description>


  <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.4.RELEASE</version>
    <relativePath/> <!-- lookup parent from repository -->
  </parent>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
    <java.version>1.8</java.version>
    <spring-cloud.version>Dalston.SR1</spring-cloud.version>
    <project-avro.version>7.0.0-SNAPSHOT</project-avro.version>
    <apache.avro.version>1.8.2</apache.avro.version>
  </properties>

  <dependencyManagement>
    <dependencies>
      <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-dependencies</artifactId>
        <version>${spring-cloud.version}</version>
        <type>pom</type>
        <scope>import</scope>
      </dependency>
    </dependencies>
  </dependencyManagement>

  <dependencies>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-starter-stream-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream-schema</artifactId>
      <version>1.2.2.RELEASE</version>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.9</version>
    </dependency>
    <dependency>
      <groupId>org.apache.avro</groupId>
      <artifactId>avro</artifactId>
      <version>${apache.avro.version}</version>
    </dependency>
    <dependency>
      <groupId>project.foundation</groupId>
      <artifactId>maritz-avro</artifactId>
      <version>${project-avro.version}</version>
    </dependency>
  </dependencies>
</project>

I have published the following schema to the registry:

{
  "type": "record",
  "name": "RoutingSlipMsg",
  "fields": [
    {
      "name": "projectNumber",
      "type": "string"
    },
    {
      "name": "clientName",
      "type": "string"
    },
    {
      "name": "programTheme",
      "type": "string"
    },
    {
      "name": "databaseName",
      "type": [
        "null",
        "string"
      ]
    },
    {
      "name": "actionType",
      "type": [
        "null",
        "string"
      ]
    }
  ]
}

Original Test application.yml

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamicSchemaGenerationEnabled: true
      bindings:
        output:
          binder: kafka
          destination: routing_slip_dev
#          contentType: application/avro
#          contentType: application/*+avro
          contentType: application/com.project.RoutingSlipMsg.v1+avro
      schemaRegistryClient:
        endpoint: http://192.168.99.100:8081
      kafka:
        binder:
          defaultBrokerPort: 9092
          defaultZkPort: 2181
          zkNodes: 192.168.99.100
          brokers: 192.168.99.100

Test application.yml edits:

spring:
  cloud:
    stream:
      schema:
        avro:
          dynamicSchemaGenerationEnabled: true
      bindings:
        output:
          binder: kafka
          destination: routing_slip_dev
#          contentType: application/*+avro
#          contentType: application/avro
      schemaRegistryClient:
        endpoint: http://192.168.99.100:8081
      kafka:
        binder:
          defaultBrokerPort: 9092
          defaultZkPort: 2181
          zkNodes: 192.168.99.100
          brokers: 192.168.99.100

Configuration class

@Configuration
public class DemoConfiguration {


  @Bean
  public ConfluentSchemaRegistryClient schemaRegistryClient() {
    ConfluentSchemaRegistryClient registry = new ConfluentSchemaRegistryClient();
    registry.setEndpoint("http://192.168.99.100:8081");
    return registry;
  }

}

The auto topic creation seems to be working properly. I can change the destination: value to something else each time and a new topic will be created.

Test

package com.project.kafka.kafkademo;

import com.project.avro.RoutingSlipMsg;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaDemoApplicationTests {

  @Autowired
  private KafkaRoutingSlipSender kafkaRoutingSlipSender;

  @Test
  public void testSend() throws Exception {

    RoutingSlipMsg routingSlip = new RoutingSlipMsg();
    routingSlip.setActionType("FAKE_ACTION");
    routingSlip.setClientName("TEST_CLIENT");
    routingSlip.setDatabaseName("NONE");
    routingSlip.setProgramTheme("THEME");
    routingSlip.setProjectNumber("ABC123");

    kafkaRoutingSlipSender.send(routingSlip);
  }
}

**BootApplication class*

package com.project.kafka.kafkademo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaDemoApplication {

  public static void main(String[] args) {
    SpringApplication.run(KafkaDemoApplication.class, args);
  }
}

Original Sender class

package com.project.kafka.kafkademo;

import com.project.avro.RoutingSlipMsg;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.cloud.stream.schema.client.EnableSchemaRegistryClient;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.support.GenericMessage;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {


  @InboundChannelAdapter(value = Source.OUTPUT)
  public MessageSource<RoutingSlipMsg> send(RoutingSlipMsg data) {
    System.out.println("**** Hello World ****");
    return () -> {
      return new GenericMessage<>(data);
    };
  }
}

Sender Class Edits:

@Component
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
public class KafkaRoutingSlipSender {

  @Autowired
  private Source source;

  public void send(RoutingSlipMsg data) {
    System.out.println("**** Hello World ****");
    source.output().send(MessageBuilder.withPayload(data).build());
  }
}

Original Stacktrace / Exception due to inappropriate use of @InboundChannelAdapter

2017-07-06 12:20:48.645 ERROR 69011 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Failed to invoke method; nested exception is java.lang.IllegalArgumentException: wrong number of arguments
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:119)
    at org.springframework.integration.endpoint.AbstractMessageSource.receive(AbstractMessageSource.java:134)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.receiveMessage(SourcePollingChannelAdapter.java:224)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:245)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:58)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:190)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:186)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:353)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:344)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IllegalArgumentException: wrong number of arguments
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:216)
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:201)
    at org.springframework.integration.endpoint.MethodInvokingMessageSource.doReceive(MethodInvokingMessageSource.java:116)
    ... 20 more

I have reviewed several different posts on here, but nothing seems to be helping. What might I be missing?

Update 1: I have made updates to the KafkaRoutingSlipSender class and the application.yml as shown above. With the contentType removed, these changes will publish a message like:

ÿcontentType?"application/x-java-object;type=com.maritz.avro.RoutingSlipMsg"FAKE_ACTIOÎTEST_CLIENÔNONÅTHEMÅABC12³

to the topic.

However, when I set the contentType to application/*+avro, the @Autowired Source appears to be casuing a NullPointerException.

New Stacktrace / Exception:

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is java.lang.NullPointerException
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=9a9ebc45-9431-8de5-1cb0-40d191082599, timestamp=1499438830456}]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:22)
    at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:51)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.schema.client.DefaultSchemaRegistryClient.register(DefaultSchemaRegistryClient.java:57)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
    ... 31 more

Update 2 (07-10-2017): I have fixed the null Source issue, thanks to this information:

Spring Boot: Configuration Class is simply ignored and not loaded

The ConfluentSchemaRegistryClient is now being used. I can step into the code when debugging, but it doesn't find the schema.

ConfluentSchemaRegistryClient.register(String subject, String format, String schema)

ResponseEntity<Map> response = this.template.exchange(this.endpoint + path, HttpMethod.POST, request, Map.class, new Object[0]);

New Stacktrace / Exception (07-10-2017):

org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'output'; nested exception is org.springframework.web.client.HttpClientErrorException: 404 Not Found
, failedMessage=GenericMessage [payload={"projectNumber": "ABC123", "clientName": "TEST_CLIENT", "programTheme": "THEME", "databaseName": "NONE", "actionType": "FAKE_ACTION"}, headers={id=73365130-a91b-ba9c-aaf0-77cfaa26f73d, timestamp=1499700409997}]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:449)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:373)
    at com.maritz.kafka.kafkademo.KafkaRoutingSlipSender.send(KafkaRoutingSlipSender.java:29)
    at com.maritz.kafka.kafkademo.KafkaDemoApplicationTests.testSend(KafkaDemoApplicationTests.java:52)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191)
    at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
    at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
    at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
    at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
    at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Caused by: org.springframework.web.client.HttpClientErrorException: 404 Not Found
    at org.springframework.web.client.DefaultResponseErrorHandler.handleError(DefaultResponseErrorHandler.java:63)
    at org.springframework.web.client.RestTemplate.handleResponse(RestTemplate.java:700)
    at org.springframework.web.client.RestTemplate.doExecute(RestTemplate.java:653)
    at org.springframework.web.client.RestTemplate.execute(RestTemplate.java:613)
    at org.springframework.web.client.RestTemplate.exchange(RestTemplate.java:531)
    at org.springframework.cloud.stream.schema.client.ConfluentSchemaRegistryClient.register(ConfluentSchemaRegistryClient.java:73)
    at org.springframework.cloud.stream.schema.avro.AvroSchemaRegistryClientMessageConverter.resolveSchemaForWriting(AvroSchemaRegistryClientMessageConverter.java:242)
    at org.springframework.cloud.stream.schema.avro.AbstractAvroMessageConverter.convertToInternal(AbstractAvroMessageConverter.java:174)
    at org.springframework.messaging.converter.AbstractMessageConverter.toMessage(AbstractMessageConverter.java:193)
    at org.springframework.cloud.stream.binding.MessageConverterConfigurer$ContentTypeConvertingInterceptor.preSend(MessageConverterConfigurer.java:253)
    at org.springframework.integration.channel.AbstractMessageChannel$ChannelInterceptorList.preSend(AbstractMessageChannel.java:538)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:415)
    ... 31 more

Is the register() supposed to be creating a new schema if that one doesn't exist, or is the 404 appropriate behavior? Quite confused.

Upvotes: 3

Views: 4247

Answers (3)

Vinicius Carvalho
Vinicius Carvalho

Reputation: 4156

We are working on the docs, should be ready soon. In the meantime just expose this on your application:

@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
    ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
    client.setEndpoint(endpoint);
    return client;
}

Upvotes: 1

Vinicius Carvalho
Vinicius Carvalho

Reputation: 4156

So what you see as application/x-java-object;type=com.maritz.avro.RoutingSlipMsg is because the default contentType for java objects is to serialize using Kryo, and that's what it set as contentType.

Since you want to use Confluent Schema Registry, that is not the default one, you need to setup a ConfluentSchemaRegistryClient bean, check it here how to enable it: http://docs.spring.io/spring-cloud-stream/docs/Ditmars.BUILD-SNAPSHOT/reference/htmlsingle/#_schema_registry_client

You now have a DefaultSchemaRegistryClient and it's probably having issues since the REST interface between the default server and confluent are not the same.

Upvotes: 3

Gary Russell
Gary Russell

Reputation: 174769

Caused by: java.lang.IllegalArgumentException: wrong number of arguments

An @InboundChannelAdapter method cannot have any parameters...

/**
 * Indicates that a method is capable of producing a {@link org.springframework.messaging.Message}
 * or {@link org.springframework.messaging.Message} {@code payload}.
 * <p>
 * A method annotated with {@code @InboundChannelAdapter} can't accept any parameters.
 * <p>    
 * Return values from the annotated method may be of any type. If the return
 * value is not a {@link org.springframework.messaging.Message}, a {@link org.springframework.messaging.Message}
 * will be created with that object as its {@code payload}.

...

It is polled for a message, or message payload.

Upvotes: 3

Related Questions