codependent
codependent

Reputation: 24472

java.lang.ClassCastException: class .$Proxy143 cannot be cast to class .MessageChannel (... are in unnamed module of loader 'app')

I am writing the tests for a Spring Cloud Stream application. This has a KStream reading from topicA. In the test I use a KafkaTemplate to publish the messages and wait for the KStream logs to show up.

The tests throw the following exception:

java.lang.ClassCastException: class com.sun.proxy.$Proxy143 cannot be cast to class org.springframework.messaging.MessageChannel (com.sun.proxy.$Proxy143 and org.springframework.messaging.MessageChannel are in unnamed module of loader 'app')
    at org.springframework.cloud.stream.test.binder.TestSupportBinder.bindConsumer(TestSupportBinder.java:66) ~[spring-cloud-stream-test-support-3.0.1.RELEASE.jar:3.0.1.RELEASE]
    at org.springframework.cloud.stream.binding.BindingService.doBindConsumer(BindingService.java:169) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.BindingService.bindConsumer(BindingService.java:115) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.AbstractBindableProxyFactory.createAndBindInputs(AbstractBindableProxyFactory.java:112) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at org.springframework.cloud.stream.binding.InputBindingLifecycle.doStartWithBindable(InputBindingLifecycle.java:58) ~[spring-cloud-stream-3.0.2.BUILD-SNAPSHOT.jar:3.0.2.BUILD-SNAPSHOT]
    at java.base/java.util.LinkedHashMap$LinkedValues.forEach(LinkedHashMap.java:608) ~[na:na]

This exception doesn't show up in the normal execution of the application.

KSTREAM:

@Configuration
class MyKStream() {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Bean
    fun processSomething(): Consumer<KStream<XX, XX>> {
        return Consumer { something ->
            something.foreach { key, value ->
            logger.info("--------> Processing xxx key {} - value {}", key, value)
        }
    }

TEST:

@TestInstance(PER_CLASS)
@EmbeddedKafka
@SpringBootTest(properties = [
    "spring.profiles.active=local",
    "schema-registry.user=",
    "schema-registry.password=",
    "spring.cloud.stream.bindings.processSomething-in-0.destination=topicA",
    "spring.cloud.stream.bindings.processSomething-in-0.producer.useNativeEncoding=true",
"spring.cloud.stream.bindings.processSomethingElse-in-0.destination=topicB",
    "spring.cloud.stream.bindings.processSomethingElse-in-0.producer.useNativeEncoding=true",
    "spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
    "spring.cloud.stream.function.definition=processSomething;processSomethingElse"])
class MyKStreamTests {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Autowired
    private lateinit var embeddedKafka: EmbeddedKafkaBroker

    @Autowired
    private lateinit var schemaRegistryMock: SchemaRegistryMock

    @AfterAll
    fun afterAll() {
        embeddedKafka.kafkaServers.forEach { it.shutdown() }
        embeddedKafka.kafkaServers.forEach { it.awaitShutdown() }
    }

    @Test
    fun `should send and process something`() {

        val producer = createProducer()
        logger.debug("**********----> presend")
        val msg = MessageBuilder.withPayload(xxx)
                .setHeader(KafkaHeaders.MESSAGE_KEY, xxx)
                .setHeader(KafkaHeaders.TIMESTAMP, 1L)
                .build()
        producer.send(msg).get()
        logger.debug("**********----> sent")

        Thread.sleep(100000)
    }
}

@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {

    private val schemaRegistryMock = SchemaRegistryMock()

    @PostConstruct
    fun init() {
        System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
        System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
        schemaRegistryMock.start()
        System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
    }

    @Bean
    fun schemaRegistryMock(): SchemaRegistryMock {
        return schemaRegistryMock
    }

    @PreDestroy
    fun preDestroy() {
        schemaRegistryMock.stop()
    }
}

Upvotes: 1

Views: 2948

Answers (1)

Zakhooi
Zakhooi

Reputation: 36

You are probably using spring-cloud-stream-test-support as a dependency and this dependency bypasses some of the core functionality of the binder API resulting in this error.

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.3.RELEASE/reference/html/spring-cloud-stream.html#_testing

Upvotes: 1

Related Questions