kioli
kioli

Reputation: 725

Wait for several Flows to finish before proceeding

I need to upload an image to a server, and to do so I cannot use other libraries, but rather I split it (base64 encoded) in chunks and upload them all.

I am using Kotlin coroutine Flows for it, and what I currently do is to do a first call (returning a flow) to obtain a image ID that I need to append in all the upload requests

Here are the 2 functions I use to upload the image

fun submitImage(payload: Payload): Flow<String> {
    val request = requestBuilder.buildUploadImageRequest(payload)
    return client.execute(request)
        .serviceFlow({ response ->
            val imageId = response.body.id
            uploadImage(payload.imageBase64, imageId)
            imageId
        }, { response ->
            throw MyServerError("Error ${response.error}")
        })
}

private fun uploadImage(imageBase64: String, imageId: String) {
    val chunks = divideEncodedImageInChunksOfSize(imageBase64)
    var v = 1
    for (chunk in chunks) {
        val payload = generatePayload(imageId, v, chunk, false)
        submitImageChunk(payload)
        v++
    }
    val payload = generatePayload(imageId, v, "", true)
    submitImageChunk(payload)
}

private fun submitImageChunk(payload: JSONObject): Flow<Unit> {
    val request = requestBuilder.buildUploadImageChunkRequest(payload)
    return client.execute(request)
        .serviceFlow({ }, { response ->
            throw MyHttpError(response)
        })
}

And I make use of the following utility functions

// Extension function to handle Flows and their activation
internal fun MyHttpClient.execute(request: MyHttpRequest): Flow<MyHttpResponse> {
    return flow {
        val deferred = CompletableDeferred<MyHttpResponse>()
        executeHttp(request, object : MyHttpListener {
            override fun onSuccess(response: MyHttpResponse) {
                deferred.complete(response)
            }

            override fun onFailure(response: MyHttpResponse) {
                deferred.completeExceptionally(MyHttpError(response))
            }
        })
        emit(deferred.await())
    }
}

// Extension function to catch exceptions AND to check if the response body is null
internal fun <T> Flow<MyHttpResponse>.serviceFlow(
    onSuccess: (response: MyHttpResponse) -> T,
    onError: (response: MyHttpResponse) -> Unit
) = flatMapConcat { response ->
    flowOf(response)
        .map { res ->
            res.body?.let { it ->
                onSuccess(res)
            } ?: throw MyParseError("MyHttpResponse has a null body")
        }
        .catchException<JSONException, T> { e ->
            throw MyParseError("Parsing exception $e")
        }
}.catchException<MyHttpError, T> { e ->
    onError(e.response)
}

// Function leveraging OkHttpClient to make a HTTPRequest
internal fun executeHttp { ... }

The problem I think is due to the fact that the function submitImage returns after launching all the sub-Flows for the upload of the image but it does not wait for all of them to complete. I am not sure what construct Kotlin coroutines has for a use-case like this, can anybody help me out?

Upvotes: 3

Views: 4450

Answers (2)

kioli
kioli

Reputation: 725

Thank you musafee to put me in the right direction.

The answer in the end was that I was creating these flows in the uploadImage function but I was never actually calling collect on them, therefore they remained unlaunched.

The solution I opted for is to return the list of flows created there to the calling function, and from there to change the return type of the submitImage function from a Flow<String> to a Flow<List<Flow<Unit>>>, and from that upper level trigger them

Upvotes: 1

user12281411
user12281411

Reputation:

I think you should use WorkManager and considered the chain woker feature.

With Flow feature, try this:

private suspend fun uploadImage(imageBase64: String, imageId: String) {
withContext(Dispatchers.IO){
  val chunks = divideEncodedImageInChunksOfSize(imageBase64)
  var v = 1
  for (chunk in chunks) {
     val payload = generatePayload(imageId, v, chunk, false)
     submitImageChunk(payload)
     v++
   }
  val payload = generatePayload(imageId, v, "", true)
  submitImageChunk(payload).await();
 }

private suspend fun submitImageChunk(payload: JSONObject): Deferred<Unit> {
 val request = requestBuilder.buildUploadImageChunkRequest(payload)
 return client.execute(request);
}

Upvotes: 0

Related Questions