Reputation: 51
I am currently evaluating the possibility of using spring cloud stream kinesis binder in a new project but I am having some issues.
When I have kcl-kpl-enabled: false everything works fine. However when I have kcl-kop-enabled I keep getting the following error:
org.springframework.core.serializer.support.SerializationFailedException: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is java.io.StreamCorruptedException: invalid stream header: 61686868
This is my current configuration:
spring:
cloud:
stream:
kinesis:
binder:
checkpoint:
create-delay: 0
table: feeder_mycollection_changes_table
kpl-kcl-enabled: true
bindings:
processEvent-in-0:
consumer:
shardIteratorType: TRIM_HORIZON
bindings:
processEvent-in-0:
destination: mycollection_changes_stream
content-type: application/json
consumer:
headerMode: none
The versions of the dependencies that I am using on my tests are:
<spring-cloud.version>Hoxton.RC2</spring-cloud.version>
<spring-cloud-stream.version>Horsham.RC2</spring-cloud-stream.version>
<spring-cloud-stream-kinesis.version>2.0.0.BUILD-SNAPSHOT</spring-cloud-stream-kinesis.version>
Upvotes: -1
Views: 839
Reputation: 121177
The fix is here: https://github.com/spring-cloud/spring-cloud-stream-binder-aws-kinesis/commit/b64cd10c5b5aac209b61399f81e2801f24fbbaf4
The problem is that a default converter
in the KclMessageDrivenChannelAdapter
is a DeserializingConverter
.
Spring Cloud Stream doesn't deal with Java serialization and it has its own mechanism to convert byte[]
. So, we needed to fix a Binder implementation to rely on the Spring Cloud Stream conversion infrastructure.
Upvotes: 1