Martin Agfjord
Martin Agfjord

Reputation: 96

How to achieve mutex on method in Kotlin and prioritize one thread before another?

I have two kafka topics my_priorized_topic and my_not_so_priorized_topic. I want to have mutex on EventProcessor.doLogic, and always prioritize on handle messages from my_prioritized_topic before messages from my_not_so_prioritized_topic

Can anyone give me some pointers how to solve this with Kotlin, maybe with coroutines?

class EventProcessor {
  fun doLogic(message: String) {
    ... // code which cannot be parallelized
  }
}
class KafkaConsumers(private val eventProcessor: EventProcessor) {
  @KafkaConsumer(topic = "my_priorized_topic")
  fun consumeFromPriorizedTopic(message: String) {
    eventProcessor.doLogic(message)
  }

  @KafkaConsumer(topic = "my_not_so_priorized_topic")
  fun consumeFromNotSoPrioritizedTopic(message: String) {
    eventProcessor.doLogic(message)  
  }
}

Upvotes: 3

Views: 928

Answers (1)

Adib Faramarzi
Adib Faramarzi

Reputation: 4062

You could create two Channels for your high and low priority tasks. Then to consume the events from the channels, use coroutines' select expression and put the high priority task channel first.

Example (the String is the even):

fun process(value: String) {
    // do what you want with the event
}

suspend fun selectFromHighAndLow(highPriorityChannel: ReceiveChannel<String>, lowPriorityChannel: ReceiveChannel<String>): String =
        select<String> {
            highPriorityChannel.onReceive { value ->
                value
            }
            lowPriorityChannel.onReceive { value ->
                value
            }
        }


val highPriorityChannel = Channel<String>()
val lowPriorityChannel = Channel<String>()
while (true) {
    process(selectFromHighAndLow(highPriorityChannel, lowPriorityChannel))
}

To send stuff to those channels, you can use channel.send(event).

Upvotes: 3

Related Questions