WiredCoder
WiredCoder

Reputation: 926

StreamListener cannot seem to work with Message type

We have an application the consumes messages from Kafka and process it. We are using Spring boot 2.2.6.RELEASE, and Spring cloud Hoxton.SR4.

I am trying to receive a simple message:

{
  "payload": {
    "config": {
      "credentials": {}
    },
    "id": "est-00001",
    "merchantKey": "test-00001",
    "name": "Test",
    "version": 7,
    "type": "PARTNER",
    "vendorNumber": "14"
  },
  "metadata": {
    "timestamp": -1,
    "partition": 8,
    "key": "test-00001",
    "offset": 105,
    "topic": "configure",
    "headers": []
  },
  "key": "00001",
  "messageType": "dent.set",
  "id": "abf75248-6fb0-4b57-a92c-74d4d3143cc0",
  "time": "2018-03-16T15:56Z"
}

And this is the model I am using to deserialise the message


package com.commercetools.tuev.marketplace.merchant.model;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.annotation.JsonPropertyDescription;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Map;

@JsonInclude(JsonInclude.Include.NON_NULL)
@Data
@AllArgsConstructor
public class MerchantMessage {
    public MerchantMessage(){}

    @JsonProperty("id")
    @JsonPropertyDescription("The message id, usually a random UUID")
    private String id;

    @JsonProperty("time")
    @JsonPropertyDescription("ISO-8501 timestamp of the event")
    private String time;

    @JsonProperty("key")
    @JsonPropertyDescription("The key has to always match the entity's id (product id)")
    private String key;

    @JsonProperty("messageType")
    @JsonPropertyDescription("The Message Type")
    private String messageType;

    @JsonProperty("payload")
    private Map<String, Object> payload;

    @JsonProperty("metadata")
    private Map<String, Object> metadata;
}

and all I get is the following exception:

Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: Cannot construct instance of `org.springframework.messaging.Message` (no Creators, like default construct, exist): abstract types either need to be mapped to concrete types, have custom deserializer, or contain additional type information

application.properties

spring.cloud.stream.bindings.merchantInput.destination=merchantTopic
spring.cloud.stream.bindings.merchantInput.group=consumerGroup
spring.cloud.stream.bindings.merchantInput.consumer.header-mode=none
spring.cloud.stream.bindings.merchantInput.content-type=application/json
spring.cloud.stream.kafka.bindings.merchantInput.consumer.autoCommitOffset=false
spring.cloud.stream.bindings.merchantInput.consumer.partitioned=true
spring.cloud.stream.bindings.merchantInput.consumer.max-attempts=1

Function

    @StreamListener(value = MerchantProcessor.INPUT)
    public void manage(Flux<Message<MerchantMessage>> message) {
        message
                .map(Message::getPayload)
                .doOnNext(System.out::println)
                .subscribe(payload-> System.out.println("Consumed: "+payload));
    }

If I remove the flux, everything just works fine:

    @StreamListener(value = MerchantProcessor.INPUT)
    public void manage(Message<MerchantMessage> message) {
        Mono.just(message)
                .map(Message::getPayload)
                .doOnNext(System.out::println)
                .subscribe(payload -> System.out.println("Consumed: " + payload));
    }

Upvotes: 0

Views: 1042

Answers (1)

Oleg Zhurakousky
Oleg Zhurakousky

Reputation: 6126

The @StreamListener and annotation-based programming model has been deprecated for a while now and for the past several years we have fully migrated to a functional programming model which requires no annotations.

You can simply change your code to be

@Bean
Consumer<Flux<Message<MerchantMessage>> message> {
   return flux -> flux
                .map(Message::getPayload)
                .doOnNext(System.out::println)
                .subscribe(payload-> System.out.println("Consumed: "+payload));
}

Your input binding by convention is named message-in-0. You can get more information here as well.

Upvotes: 2

Related Questions