Reputation: 96
I have two kafka topics my_priorized_topic
and my_not_so_priorized_topic
. I want to have mutex on EventProcessor.doLogic
, and always prioritize on handle messages from my_prioritized_topic
before messages from my_not_so_prioritized_topic
Can anyone give me some pointers how to solve this with Kotlin, maybe with coroutines?
class EventProcessor {
fun doLogic(message: String) {
... // code which cannot be parallelized
}
}
class KafkaConsumers(private val eventProcessor: EventProcessor) {
@KafkaConsumer(topic = "my_priorized_topic")
fun consumeFromPriorizedTopic(message: String) {
eventProcessor.doLogic(message)
}
@KafkaConsumer(topic = "my_not_so_priorized_topic")
fun consumeFromNotSoPrioritizedTopic(message: String) {
eventProcessor.doLogic(message)
}
}
Upvotes: 3
Views: 928
Reputation: 4062
You could create two Channel
s for your high and low priority tasks. Then to consume the events from the channels, use coroutines' select expression and put the high priority task channel first.
Example (the String is the even):
fun process(value: String) {
// do what you want with the event
}
suspend fun selectFromHighAndLow(highPriorityChannel: ReceiveChannel<String>, lowPriorityChannel: ReceiveChannel<String>): String =
select<String> {
highPriorityChannel.onReceive { value ->
value
}
lowPriorityChannel.onReceive { value ->
value
}
}
val highPriorityChannel = Channel<String>()
val lowPriorityChannel = Channel<String>()
while (true) {
process(selectFromHighAndLow(highPriorityChannel, lowPriorityChannel))
}
To send stuff to those channels, you can use channel.send(event)
.
Upvotes: 3