Reputation: 357
I found this repository at GitHub Long Polling Redis
So in spring boot, we can use a deferred request to hold the client request for several seconds (AppMsgController.java#L72)
and it will send back to the client until the deferred request is filled with the result (AppMsgHandler.java#L74) or until it reaches the timeout.
I also notice this mechanism also can be implemented with CompetableFuture in java using completeOnTimeout.
But I wonder can we use something similar in Kotlin Coroutines?
Upvotes: 1
Views: 3306
Reputation: 37710
As @Spitzbueb said, you could do something similar with CompletableDeferred
.
However, if you don't need to support the clear()
and count()
methods, you could also probably simplify by replacing the ConcurrentHashMap
with a simple MutableSharedFlow<Unit>
that broadcasts "pings" from redis.
In onMessage
, you could emit Unit
into the mutable shared flow to notify subscribers, and then you can simply implement your request mechanism by awaiting the first element on the shared flow and making the readSubset
request:
class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {
private val events = MutableSharedFlow<Unit>()
suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
val currentMsgs = appMsgRepo.readSubset(start)
if (currentMsgs.isNotEmpty()) {
return currentMsgs
}
val newMessages = withTimeoutOrNull(timeoutMillis) {
events.first()
appMsgRepo.readSubset(start)
}
return newMessages ?: emptyList()
}
override fun onMessage(message: Message, pattern: ByteArray?) {
LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
events.tryEmit(Unit)
}
companion object {
private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
private val UTF8: Charset = StandardCharsets.UTF_8
}
}
The controller can then simply call requestMessages
(provided you make your controller use suspend
functions with Spring WebFlux).
Upvotes: 2
Reputation: 5881
In Kotlin coroutines there is the Deferred
type, which is similar to CompletableFuture
in the sense that it represents a value that is not yet available but probably will be in the future (if no error occurs/exception is thrown). @Joffrey pointed out that there is also a CompletableDeferred
, which is even closer to ComplatableFuture
enabling the user to manually call complete
or exceptionallyComplete
.
Deferreds can easily be created with the async
extension function on CoroutineScope
. If you want to set a timeout, Kotlin has you covered with the withTimeout
function that cancels the block of code after a given time.
Note that withTimeout
should be inside async
and not the other way around.
Take a look at this example: https://pl.kotl.in/uYe12ds7g
Upvotes: 1