Reputation: 53
I am trying to consume messages wrapped by CloudEvents.V1 in Kotlin
I checked on documentation (doc) and applied the same idea as in Java.
My event is built as:
CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(EVENT_SOURCE))
.withType(configs.mapTypes[topic])
.withTime(OffsetDateTime.now())
.withData(objectMapper.writeValueAsBytes(result))
.withDataContentType("application/json")
.build()
My consumer properties are as follows:
val props = Properties()
props[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = KAFKA.bootstrapServers
props[ConsumerConfig.GROUP_ID_CONFIG] = groupId
props[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = OffsetResetStrategy.EARLIEST.name.lowercase()
props[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
props[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = CloudEventDeserializer::class.java
return props
My dataclass:
data class Test(
val id: String,
val score: Double
)
When I try to convert the data to an object using jackson objectmapper and CloudEventUtils.mapData as follows:
val MAPPER: ObjectMapper = jacksonObjectMapper()
.enable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS)
.disable(SerializationFeature.WRITE_DATE_TIMESTAMPS_AS_NANOSECONDS)
.registerModule(JavaTimeModule())
val data = CloudEventUtils.mapData(
message[0],
PojoCloudEventDataMapper.from(MAPPER, Test::class.java)
)
I get the error:
errorType: DATA_CONVERSION
detailMessage: Error while trying to convert data from class com.fasterxml.jackson.databind.JsonNode to [simple type, class com.trendyol.mergetopicsstream.fixtures.Test]
cause: java.lang.IllegalArgumentException: Cannot construct instance of `com.trendyol.mergetopicsstream.fixtures.Test` (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('{"id":"Zagxn","score":0.354854009767147}')
at [Source: UNKNOWN; byte offset: #UNKNOWN]
Upvotes: 0
Views: 1187