Andras Hatvani
Andras Hatvani

Reputation: 4491

How to trust all packages when deserialising in Spring Cloud Stream?

This consumer didn't need trusted packages:

@Bean
    fun berichtStateStoreBuilder() = Consumer<GlobalKTable<String, BerichtEvent>> {}

This suddenly does:

@Bean
    fun berichtStateStoreBuilder() = Consumer<KStream<ByteArray, ByteArray>> {
        it
            .transform({ EventTypeAwareTransformer(EVENT_TYPE_MAPPING, objectMapper) })
            .mapValues { v -> v.payload as BerichtEvent }
            .groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde()))
            .aggregate(
                { BerichtAggregator() },
                { _, event, aggregator -> aggregator.add(event) },
                Named.`as`("aggregate"),
                Materialized.`as`<String, BerichtAggregator, KeyValueStore<Bytes, ByteArray>>(BerichtStore.NAME)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(JsonSerde(BerichtAggregator::class.java))
            )

I've tried the following approaches, but they didn't work as I only get the following error:

Caused by: java.lang.IllegalArgumentException: The class 'at.wrwks.smp.controlling.event.BerichtEvent' is not in the trusted packages: [java.util, java.lang]. If you believe this class is safe to deserialize, please provide its name. If the serialization is only done by a trusted source, you can also enable trust all (*).
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.getClassIdType(DefaultJackson2JavaTypeMapper.java:126)
    at org.springframework.kafka.support.converter.DefaultJackson2JavaTypeMapper.toJavaType(DefaultJackson2JavaTypeMapper.java:100)
    at org.springframework.kafka.support.serializer.JsonDeserializer.deserialize(JsonDeserializer.java:504)
    at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:55)
    at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66)
    ... 8 more
    @Bean
    fun defaultKafkaHeaderMapper(objectMapper: ObjectMapper): DefaultKafkaHeaderMapper {
        val mapper = DefaultKafkaHeaderMapper(objectMapper, "event_type")

        val rawMappedHeaders = HashMap<String, Boolean>()
        rawMappedHeaders[BaseEvent.EVENT_TYPE_HEADER] = true
        mapper.setRawMappedHeaders(rawMappedHeaders)
        mapper.addTrustedPackages("*")
        return mapper
    }
spring.cloud.stream.kafka.streams.binder.header-mapper-bean-name: defaultKafkaHeaderMapper
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.use.type.headers: false
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages: '*'

Spring Cloud Stream version: 3.1.2 with Kafka Streams binder.

Workaround by using a custom JSON serde:

            .groupByKey(Grouped.with(Serdes.StringSerde(), Serdes.serdeFrom(
                SimpleJsonSerializer(objectMapper), SimpleJsonDeserializer(objectMapper, BerichtEvent::class.java)
            )))

Upvotes: 1

Views: 871

Answers (2)

Andras Hatvani
Andras Hatvani

Reputation: 4491

Although I've done this dozens of times this time I forgot to pass the target class to the constructor JsonSerde(). This is correct:

.groupByKey(Grouped.with(Serdes.StringSerde(), JsonSerde(BerichtEvent::class.java)))

Apparently when no class will be passed, then no package can be added to the trusted packages. With a class passed the Serde will be configured with the package the target pass belongs to.

Upvotes: 1

Gary Russell
Gary Russell

Reputation: 174699

I just tested it and it works fine for me...

@SpringBootApplication
public class So67059860Application {

    public static void main(String[] args) {
        SpringApplication.run(So67059860Application.class, args);
    }

    @Bean
    public Consumer<KStream<String, Foo>> input() {
        return str -> str.foreach((k, v) -> System.out.println(v));
    }

}
public class Foo {

    private String bar;

    public String getBar() {
        return this.bar;
    }

    public void setBar(String bar) {
        this.bar = bar;
    }

    @Override
    public String toString() {
        return "Foo [bar=" + this.bar + "]";
    }

}
spring.cloud.stream.kafka.streams.binder.configuration.default.value.serde=org.springframework.kafka.support.serializer.JsonSerde

spring.cloud.stream.kafka.streams.binder.configuration.spring.json.trusted.packages=*
spring.cloud.stream.kafka.streams.binder.configuration.spring.json.value.default.type=com.example.demo.Foo
spring.application.name=so67059860

spring.cloud.function.definition=input

#logging.level.root=debug
Foo [bar=baz]

Boot 2.4.4, Cloud 2020.0.2 (SCSt 3,1.2).

Set a breakpoint in JsonSerde.configure() to see the properties being used.

Upvotes: 1

Related Questions