Reputation: 1221
When using Kafka integration and configuring a QueueChannel,
The processing of messages after the queue channel receives are executed sequentially with a delay of one second, it is not possible to understand the reason, the queue channel should be an accumulation of messages (up to the configured limit) and release the messages from the queue as long as it is not empty and there is a consumer. Why are messages released sequentially with a delay of one second?
follows the log, as can be seen, the messages are received immediately (according to the date of the log) and are processed sequentially with a delay of 1 second? 2020-04-06 13:08:28.108 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 2 - enriched 2020-04-06 13:08:28.109 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 2 - enriched 2020-04-06 13:08:28.110 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 7 - enriched 2020-04-06 13:08:28.111 INFO 30718 --- [ntainer#0-0-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 5 - enriched 2020-04-06 13:08:28.116 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 6 - enriched 2020-04-06 13:08:28.119 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 4 - enriched 2020-04-06 13:08:28.120 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 1 - enriched 2020-04-06 13:08:28.121 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 8 - enriched 2020-04-06 13:08:28.122 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 3 - enriched 2020-04-06 13:08:28.123 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 9 - enriched 2020-04-06 13:08:28.124 INFO 30718 --- [ntainer#0-1-C-1] o.s.integration.handler.LoggingHandler : readKafkaChannel: item: 10 - enriched 2020-04-06 13:08:29.111 INFO 30718 --- [ask-scheduler-2] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 7 - enriched 2020-04-06 13:08:30.112 INFO 30718 --- [ask-scheduler-4] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 5 - enriched 2020-04-06 13:08:31.112 INFO 30718 --- [ask-scheduler-1] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 6 - enriched 2020-04-06 13:08:32.113 INFO 30718 --- [ask-scheduler-5] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 4 - enriched 2020-04-06 13:08:33.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 1 - enriched 2020-04-06 13:08:34.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 8 - enriched 2020-04-06 13:08:35.113 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 3 - enriched 2020-04-06 13:08:36.114 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 9 - enriched 2020-04-06 13:08:37.114 INFO 30718 --- [ask-scheduler-3] o.s.integration.handler.LoggingHandler : channelThatIsProcessingSequential - item: 10 - enriched
Blockquote
package br.com.gubee.kafaexample
import org.apache.kafka.clients.admin.NewTopic
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.MediaType
import org.springframework.integration.annotation.Gateway
import org.springframework.integration.annotation.MessagingGateway
import org.springframework.integration.config.EnableIntegration
import org.springframework.integration.context.IntegrationContextUtils
import org.springframework.integration.dsl.IntegrationFlow
import org.springframework.integration.dsl.IntegrationFlows
import org.springframework.integration.kafka.dsl.Kafka
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.listener.ContainerProperties
import org.springframework.scheduling.annotation.Async
import org.springframework.stereotype.Component
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.PathVariable
import org.springframework.web.bind.annotation.RequestMapping
import org.springframework.web.bind.annotation.RestController
@RestController
@RequestMapping(path = ["/testKafka"], produces = [MediaType.APPLICATION_JSON_VALUE])
class TestKafkaResource(private val testKafkaGateway: TestKafkaGateway) {
@GetMapping("init/{param}")
fun init(@PathVariable("param", required = false) param: String? = null) {
(1..10).forEach {
println("Send async item $it")
testKafkaGateway.init("item: $it")
}
}
}
@MessagingGateway(errorChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
@Component
interface TestKafkaGateway {
@Gateway(requestChannel = "publishKafkaChannel")
@Async
fun init(param: String)
}
@Configuration
@EnableIntegration
class TestKafkaFlow(private val kafkaTemplate: KafkaTemplate<*, *>,
private val consumerFactory: ConsumerFactory<*, *>) {
@Bean
fun readKafkaChannelTopic(): NewTopic {
return NewTopic("readKafkaChannel", 40, 1)
}
@Bean
fun publishKafka(): IntegrationFlow {
return IntegrationFlows
.from("publishKafkaChannel")
.transform<String, String> { "${it} - enriched" }
.handle(
Kafka.outboundChannelAdapter(kafkaTemplate)
.topic("readKafkaChannel")
.sendFailureChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
)
.get()
}
@Bean
fun readFromKafka(): IntegrationFlow {
return IntegrationFlows
.from(
Kafka.messageDrivenChannelAdapter(consumerFactory, "readKafkaChannel")
.configureListenerContainer { kafkaMessageListenerContainer ->
kafkaMessageListenerContainer.concurrency(2)
kafkaMessageListenerContainer.ackMode(ContainerProperties.AckMode.RECORD)
}
.errorChannel(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
)
.channel { c -> c.queue(10) }
.log<String> {
"readKafkaChannel: ${it.payload}"
}
.channel("channelThatIsProcessingSequential")
.get()
}
@Bean
fun kafkaFlowAfter(): IntegrationFlow {
return IntegrationFlows
.from("channelThatIsProcessingSequential")
.log<String> {
"channelThatIsProcessingSequential - ${it.payload}"
}
.get()
}
}
Upvotes: 1
Views: 270
Reputation: 121552
As Gary said, it is not good to shift Kafka messages into a QueueChannel
. The consumption on the Kafka.messageDrivenChannelAdapter()
is already async - really no reason to move messages to the separate thread.
Anyway it looks like you use Spring Cloud Stream with its PollerMetadata
configured to a 1 message per second
polling policy.
If that doesn't fit your requirements, you always can change that .channel { c -> c.queue(10) }
to use a second lambda and configure a custom poller
over there.
BTW, we have already some Kotlin DSL implementation in Spring Integration: https://docs.spring.io/spring-integration/docs/5.3.0.M4/reference/html/kotlin-dsl.html#kotlin-dsl
Upvotes: 3