haha
haha

Reputation: 131

How to implement NIO Socket (client) using Kotlin coroutines in Java Code?

I want to use Kotlin(v1.3.0) coroutines & java.nio.channels.SocketChannel (NIO) to replace Socket connect (blocking IO) in Android. because this can save many number of threads.

The code below can't run because of job.await() is suspending function in Kotlin, It just can be called in Ktolin coroutines block. like launch{..}, async{..}.

// this function will be called by Java Code
fun connect(address: InetSocketAddress, connectTimeout: Int): SocketChannel {

    // Start a new connection
    // Create a non-blocking socket channel
    val socketChannel = SocketChannel.open()
    socketChannel.configureBlocking(false)

    // async calls NIO connect function
    val job = GlobalScope.async(oneThreadCtx) {
        aConnect(socketChannel, address)
    }

    // I what to suspend(NOT block) current Java Thread, until connect is success
    job.await()

    return socketChannel
}

But, I tried to use runBlocking{..} make this function as normal function in Java. but job.await blocked current Java Thread, Not suspend.

so, how should I implement this function with Kotlin(v1.3.0) coroutines?

Upvotes: 13

Views: 11308

Answers (2)

PatTheGamer
PatTheGamer

Reputation: 491

As Marko pointed out your code will still end up blocking a thread even when that blocking operation is in the async coroutine. To truly get the asynchronous behavior you desire with Java and Kotlin you need to use the Async version of Socket Channel

With this, you get true asynchronous socket handling. With that class and Kotlin's suspendCoroutine builder method, you can turn the async handlers into suspendable calls.

Here is an example of implementing it for read:

class TcpSocket(private val socket: AsynchronousSocketChannel) {
    suspend fun read(buffer: ByteBuffer): Int {
        return socket.asyncRead(buffer)
    }

    fun close() {
        socket.close()
    }

    private suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
        return suspendCoroutine { continuation ->
           this.read(buffer, continuation, ReadCompletionHandler)
        }
    }

    object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
        override fun completed(result: Int, attachment: Continuation<Int>) {
            attachment.resume(result)
        }

        override fun failed(exc: Throwable, attachment: Continuation<Int>) {
            attachment.resumeWithException(exc)
        }
    }
}

You could choose to remove the wrapping that I'm doing here and just expose an asyncRead method on AsynchronousSocketChannel like so:

suspend fun AsynchronousSocketChannel.asyncRead(buffer: ByteBuffer): Int {
    return suspendCoroutine { continuation ->
       this.read(buffer, continuation, ReadCompletionHandler)
    }
}

object ReadCompletionHandler : CompletionHandler<Int, Continuation<Int>> {
    override fun completed(result: Int, attachment: Continuation<Int>) {
        attachment.resume(result)
    }

    override fun failed(exc: Throwable, attachment: Continuation<Int>) {
        attachment.resumeWithException(exc)
    }
}

It's all a matter of taste and what exactly your design goals are. You should be able to implement a similar method for the initial connect as I did here for read.

Upvotes: 10

Marko Topolnik
Marko Topolnik

Reputation: 200148

// I what to suspend(NOT block) current Java Thread, until connect is success
job.await()

This is not a realistic expectation. From the perspective of Java, a suspend fun suspends its execution by returning a special constant, COROUTINE_SUSPENDED. You need the Kotlin compiler to hide that from you and allow you to write regular-looking code that's suspendable.

Your code falls short of non-blocking suspension even from the Kotlin perspective because it uses a blocking call to connect. Submitting that call to another thread doesn't make it non-blocking.

What your code does is completely equivalent to submitting a job to a Java executor service and then awaiting on its result. You can for example use CompletableFuture.supplyAsync.

Upvotes: 1

Related Questions