Reputation: 725
I need to upload an image to a server, and to do so I cannot use other libraries, but rather I split it (base64 encoded) in chunks and upload them all.
I am using Kotlin coroutine Flows for it, and what I currently do is to do a first call (returning a flow) to obtain a image ID that I need to append in all the upload requests
Here are the 2 functions I use to upload the image
fun submitImage(payload: Payload): Flow<String> {
val request = requestBuilder.buildUploadImageRequest(payload)
return client.execute(request)
.serviceFlow({ response ->
val imageId = response.body.id
uploadImage(payload.imageBase64, imageId)
imageId
}, { response ->
throw MyServerError("Error ${response.error}")
})
}
private fun uploadImage(imageBase64: String, imageId: String) {
val chunks = divideEncodedImageInChunksOfSize(imageBase64)
var v = 1
for (chunk in chunks) {
val payload = generatePayload(imageId, v, chunk, false)
submitImageChunk(payload)
v++
}
val payload = generatePayload(imageId, v, "", true)
submitImageChunk(payload)
}
private fun submitImageChunk(payload: JSONObject): Flow<Unit> {
val request = requestBuilder.buildUploadImageChunkRequest(payload)
return client.execute(request)
.serviceFlow({ }, { response ->
throw MyHttpError(response)
})
}
And I make use of the following utility functions
// Extension function to handle Flows and their activation
internal fun MyHttpClient.execute(request: MyHttpRequest): Flow<MyHttpResponse> {
return flow {
val deferred = CompletableDeferred<MyHttpResponse>()
executeHttp(request, object : MyHttpListener {
override fun onSuccess(response: MyHttpResponse) {
deferred.complete(response)
}
override fun onFailure(response: MyHttpResponse) {
deferred.completeExceptionally(MyHttpError(response))
}
})
emit(deferred.await())
}
}
// Extension function to catch exceptions AND to check if the response body is null
internal fun <T> Flow<MyHttpResponse>.serviceFlow(
onSuccess: (response: MyHttpResponse) -> T,
onError: (response: MyHttpResponse) -> Unit
) = flatMapConcat { response ->
flowOf(response)
.map { res ->
res.body?.let { it ->
onSuccess(res)
} ?: throw MyParseError("MyHttpResponse has a null body")
}
.catchException<JSONException, T> { e ->
throw MyParseError("Parsing exception $e")
}
}.catchException<MyHttpError, T> { e ->
onError(e.response)
}
// Function leveraging OkHttpClient to make a HTTPRequest
internal fun executeHttp { ... }
The problem I think is due to the fact that the function submitImage
returns after launching all the sub-Flows for the upload of the image but it does not wait for all of them to complete.
I am not sure what construct Kotlin coroutines has for a use-case like this, can anybody help me out?
Upvotes: 3
Views: 4450
Reputation: 725
Thank you musafee to put me in the right direction.
The answer in the end was that I was creating these flows in the uploadImage
function but I was never actually calling collect
on them, therefore they remained unlaunched.
The solution I opted for is to return the list of flows created there to the calling function, and from there to change the return type of the submitImage
function from a Flow<String>
to a Flow<List<Flow<Unit>>>
, and from that upper level trigger them
Upvotes: 1
Reputation:
I think you should use WorkManager and considered the chain woker feature.
With Flow feature, try this:
private suspend fun uploadImage(imageBase64: String, imageId: String) {
withContext(Dispatchers.IO){
val chunks = divideEncodedImageInChunksOfSize(imageBase64)
var v = 1
for (chunk in chunks) {
val payload = generatePayload(imageId, v, chunk, false)
submitImageChunk(payload)
v++
}
val payload = generatePayload(imageId, v, "", true)
submitImageChunk(payload).await();
}
private suspend fun submitImageChunk(payload: JSONObject): Deferred<Unit> {
val request = requestBuilder.buildUploadImageChunkRequest(payload)
return client.execute(request);
}
Upvotes: 0