frm
frm

Reputation: 51

SerializationFailedException: Spring Cloud Stream Kinesis binder with kpl-kcl-enabled:true

I am currently evaluating the possibility of using spring cloud stream kinesis binder in a new project but I am having some issues.

When I have kcl-kpl-enabled: false everything works fine. However when I have kcl-kop-enabled I keep getting the following error:

org.springframework.core.serializer.support.SerializationFailedException: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is java.io.StreamCorruptedException: invalid stream header: 61686868

This is my current configuration:

spring:
  cloud:
    stream:
      kinesis:
        binder:
          checkpoint:
            create-delay: 0
            table: feeder_mycollection_changes_table
          kpl-kcl-enabled: true
        bindings:
          processEvent-in-0:
            consumer:
              shardIteratorType: TRIM_HORIZON
      bindings:
        processEvent-in-0:
          destination: mycollection_changes_stream
          content-type: application/json
          consumer:
            headerMode: none

The versions of the dependencies that I am using on my tests are:

<spring-cloud.version>Hoxton.RC2</spring-cloud.version>
<spring-cloud-stream.version>Horsham.RC2</spring-cloud-stream.version>
<spring-cloud-stream-kinesis.version>2.0.0.BUILD-SNAPSHOT</spring-cloud-stream-kinesis.version>

Upvotes: -1

Views: 839

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121177

The fix is here: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/commit/b64cd10c5b5aac209b61399f81e2801f24fbbaf4

The problem is that a default converter in the KclMessageDrivenChannelAdapter is a DeserializingConverter.

Spring Cloud Stream doesn't deal with Java serialization and it has its own mechanism to convert byte[]. So, we needed to fix a Binder implementation to rely on the Spring Cloud Stream conversion infrastructure.

Upvotes: 1

Related Questions