Omri Gelman
Omri Gelman

Reputation: 63

spring cloud stream kafka 2.0 - StreamListener with condition

I'm trying to create a consumer using StreamListener annotation and condition attirbute.However , i'm getting the following exception :

org.springframework.core.convert.ConversionFailedException: Failed to convert from type [java.lang.String] to type [java.lang.Integer] for value 'test'; nested exception is java.lang.NumberFormatException: For input string: "test"

TestListener:

@StreamListener(target=ITestSink.CHANNEL_NAME,condition="payload['test'] == 'test'")
public void test(@Payload TestObj message) {
    log.info("message is {}",message.getName());
}

TestObj:

@Data
@ToString(callSuper=true)
public class TestObj {

    @JsonProperty("test")
    private String test;

    @JsonProperty("name")
    private String name;

}

can someone assist with this issue?

Upvotes: 1

Views: 1906

Answers (2)

madhu pathy
madhu pathy

Reputation: 474

the payload of the message is not yet converted from the wire format (byte[]) to the desired type. In other words, it has not yet gone through the type conversion process described in the Content Type Negotiation.

So, unless you use a SPeL expression that evaluates raw data (for example, the value of the first byte in the byte array), use message header-based expressions (such as condition = "headers['type']=='dog'").

Example:

 @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'")
    public void receiveBogey(@Payload BogeyPojo bogeyPojo) {
       // handle the message
    }

Check the Spring documentation here.

Upvotes: 1

Warren Zhu
Warren Zhu

Reputation: 1495

From what you show, it should work. I suggest you remove the condition, then set breakpoint to debug. Then you should be able to know which actual type is.

Upvotes: 0

Related Questions