Reputation: 1059
Within my WebSocketConfig
class:
@Configuration
@EnableWebSocketMessageBroker
class WebsocketConfig(<...>)
I override:
@Bean(name = ["csrfChannelInterceptor"])
fun csrfChannelInterceptor(): ChannelInterceptor {
return object : ChannelInterceptor {
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
val accessor =
MessageHeaderAccessor.getAccessor(message, StompHeaderAccessor::class.java) ?: return null
when (accessor.command) {
StompCommand.SUBSCRIBE -> {
val chatId = extractChatIdFromDestination(accessor.destination)
// Prevent user from subscribing to the channel if he is not the participant
if (!chatsService.existsByChatIdAndParticipantId(chatId, getCurrentUserId())) {
throw AccessDeniedException("No permissions found")
}
}
StompCommand.SEND -> {
// Same check as for SUBSCRIBE
}
else -> {
// Blank
}
}
return super.preSend(message, channel)
}
}
}
However, when user tries to subscribe/send to specific channel, he gets an error:
Failed to send message to MessageChannel in session 912<...>8:Failed to send message to ExecutorSubscribableChannel[clientInboundChannel]
This only happens if I call any DB (blocking) functions within the preSend()
. Is this not the right approach to deny SUBSCRIBE/SEND requests based on specific logic? How to solve this issue? Cheers.
Upvotes: 1
Views: 61
Reputation: 168
The error you're encountering is likely due to the fact that you're performing blocking database operations within the preSend()
method of your ChannelInterceptor
. Blocking operations can cause thread contention and potentially block the message processing thread, leading to timeouts or errors.
To address this issue, you should avoid performing blocking operations, such as database calls, within the preSend()
method. Instead, you can delegate the database call to an asynchronous operation or use non-blocking techniques.
Here's a suggested approach:
Asynchronous Database Call: Perform the database call asynchronously using Spring's asynchronous capabilities. You can use @Async
annotation to mark your database call method as asynchronous.
Handle Database Call Result: Once the asynchronous database call completes, handle the result in a callback method. Based on the result, you can decide whether to allow or deny the message.
Here's an updated version of your ChannelInterceptor
with asynchronous database call:
import org.springframework.messaging.Message
import org.springframework.messaging.MessageChannel
import org.springframework.messaging.support.ChannelInterceptor
import org.springframework.stereotype.Component
import org.springframework.util.concurrent.ListenableFuture
import org.springframework.util.concurrent.ListenableFutureCallback
@Component
class CustomChannelInterceptor(private val chatsService: ChatsService) : ChannelInterceptor {
override fun preSend(message: Message<*>, channel: MessageChannel): Message<*>? {
val accessor = StompHeaderAccessor.wrap(message)
when (accessor.command) {
StompCommand.SUBSCRIBE -> {
val chatId = extractChatIdFromDestination(accessor.destination)
// Perform database call asynchronously
val future = chatsService.existsByChatIdAndParticipantIdAsync(chatId, getCurrentUserId())
// Handle database call result
future.addCallback(object : ListenableFutureCallback<Boolean> {
override fun onSuccess(result: Boolean?) {
if (result != true) {
throw AccessDeniedException("No permissions found")
}
}
override fun onFailure(ex: Throwable) {
// Handle failure if needed
}
})
}
StompCommand.SEND -> {
// Same check as for SUBSCRIBE
}
else -> {
// Blank
}
}
return super.preSend(message, channel)
}
// Other methods
}
In this approach, existsByChatIdAndParticipantIdAsync()
is an asynchronous method in your ChatsService
that returns a ListenableFuture<Boolean>
. Once the database call completes, the onSuccess()
method of the callback is invoked, where you can handle the result of the database call and decide whether to allow or deny the message.
Make sure to handle any potential failures appropriately in the onFailure()
method of the callback.
Upvotes: 0