mehdi dorreh
mehdi dorreh

Reputation: 303

How to avoid concurrency issues with Kotlin coroutines?

I am going to implement a chat feature in the android application. In order to do that, I fetch chat messages every five seconds from the server by a coroutine flow. The problem is when I want to send a message sometimes the server receives two concurrent requests and it returns an error. How should I make sure that these requests run sequentially in my chat repository? Here is my chat repository:

class ChatRepositoryImpl @Inject constructor(
    private val api: ApolloApi,
    private val checkTokenIsSetDataStore: CheckTokenIsSetDataStore
) : ChatRepository {

    override fun chatMessages(
        lastIndex: Int,
        limit: Int,
        offset: Int,
        channelId: Int,
    ): Flow<Resource<ChatMessages>> = flow {
        var token = ""
        checkTokenIsSetDataStore.get.first {
            token = it
            true
        }
        while (true) {
            val response = ChatMessagesQuery(
                lastIndex = Input.fromNullable(lastIndex),
                limit = Input.fromNullable(limit),
                offset = Input.fromNullable(offset),
                channelId
            ).let {
                api.getApolloClient(token)
                    .query(it)
                    .await()
            }

            response.data?.let {
                emit(
                    Resource.Success<ChatMessages>(
                        it.chatMessages
                    )
                )
            }
            if (response.data == null)
                emit(Resource.Error<ChatMessages>(message = response.errors?.get(0)?.message))
            delay(5000L)
        }
    }.flowOn(Dispatchers.IO)

    override fun chatSendText(channelId: Int, text: String): Flow<Resource<ChatSendText>> = flow {
        var token = ""
        checkTokenIsSetDataStore.get.first {
            token = it
            true
        }

        val response = ChatSendTextMutation(
            channelId = channelId,
            text = text
        ).let {
            api.getApolloClient(token)
                .mutate(it)
                .await()
        }

        response.data?.let {
            return@flow emit(
                Resource.Success<ChatSendText>(
                    it.chatSendText
                )
            )
        }
        return@flow emit(Resource.Error<ChatSendText>(message = response.errors?.get(0)?.message))
    }.flowOn(Dispatchers.IO)
}

Upvotes: 0

Views: 955

Answers (1)

broot
broot

Reputation: 28452

One way to limit concurrency is to use utils like Mutex or Semaphore. We can very easily solve your problem with mutex:

class ChatRepositoryImpl ... {
    private val apolloMutex = Mutex()

    override fun chatMessages(...) {
        ...
        apolloMutex.withLock {
            api.getApolloClient(token)
                .query(it)
                .await()
        }
        ...
    }

    override fun chatSendText(...) {
        ...
        apolloMutex.withLock {
            api.getApolloClient(token)
                .mutate(it)
                .await()
        }
        ...
    }

However, this problem should not be really fixed on the client side, but on the server side. Your attempted solution doesn't protect you against concurrent requests entirely. If for some reasons two instances of the application has the same token or if the user attempts to manipulate your application, it could still send concurrent requests.

If you can't easily fix the problem properly, you can apply the same fix on the server side that you intend to apply on the client side. Just handle requests or part of requests sequentially. It is more error-proof and also more performant, because this way only part of the whole request time has to be done sequentially.

Upvotes: 2

Related Questions