Reputation: 4491
This consumer didn't need trusted packages:
@Bean
fun berichtStateStoreBuilder() = Consumer<GlobalKTable<String, BerichtEvent>> {}
This suddenly does:
@Bean
fun berichtStateStoreBuilder() = Consumer<KStream<ByteArray, ByteArray>> {
it
.transform({ EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
.mapValues { v -> v.payload as BerichtEvent }
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde()))
.aggregate(
{ BerichtAggregator() },
{ _, event, aggregator -> aggregator.add(event) },
Named.`as`("aggregate"),
Materialized.`as`<String, BerichtAggregator, KeyValueStore<Bytes, ByteArray>>(BerichtStore.NAME)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde(BerichtAggregator::class.java))
)
I've tried the following approaches, but they didn't work as I only get the following error:
Caused by: java.lang.IllegalArgumentException: The class 'at.wrwks.smp.controlling.event.BerichtEvent' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126)
at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100)
at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504)
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55)
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
... 8 more
@Bean
fun defaultKafkaHeaderMapper(objectMapper: ObjectMapper): DefaultKafkaHeaderMapper {
val mapper = DefaultKafkaHeaderMapper(objectMapper, "event_type")
val rawMappedHeaders = HashMap<String, Boolean>()
rawMappedHeaders[BaseEvent.EVENT_TYPE_HEADER] = true
mapper.setRawMappedHeaders(rawMappedHeaders)
mapper.addTrustedPackages("*")
return mapper
}
spring.cloud.stream.kafka.streams.binder.header-mapper-bean-name: defaultKafkaHeaderMapper
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.use.type.headers: false
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages: '*'
Spring Cloud Stream version: 3.1.2 with Kafka Streams binder.
Workaround by using a custom JSON serde:
.groupByKey(Grouped.with(Serdes.StringSerde(), Serdes.serdeFrom(
SimpleJsonSerializer(objectMapper), SimpleJsonDeserializer(objectMapper, BerichtEvent::class.java)
)))
Upvotes: 1
Views: 871
Reputation: 4491
Although I've done this dozens of times this time I forgot to pass the target class to the constructor JsonSerde()
. This is correct:
.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(BerichtEvent::class.java)))
Apparently when no class will be passed, then no package can be added to the trusted packages. With a class passed the Serde
will be configured with the package the target pass belongs to.
Upvotes: 1
Reputation: 174699
I just tested it and it works fine for me...
@SpringBootApplication
public class So67059860Application {
public static void main(String[] args) {
SpringApplication.run(So67059860Application.class, args);
}
@Bean
public Consumer<KStream<String, Foo>> input() {
return str -> str.foreach((k, v) -> System.out.println(v));
}
}
public class Foo {
private String bar;
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages=*
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.value.default.type=com.example.demo.Foo
spring.application.name=so67059860
spring.cloud.function.definition=input
#logging.level.root=debug
Foo [bar=baz]
Boot 2.4.4, Cloud 2020.0.2 (SCSt 3,1.2).
Set a breakpoint in JsonSerde.configure()
to see the properties being used.
Upvotes: 1