jonderry
jonderry

Reputation: 23643

How can I use Scala's typesystem to remove an unnecessary type parameter and create a generic thrift api?

I'm trying to create a generic adapter for AsyncClient in thrift for Scala that will let an rpc implement Function1[A, Future[B]] rather than use thrift's less composable built-in callback approach. This is made challenging by the fact that the generated apis for thrift classes are largely non-generic, so creating simple generic wrapper for an arbitrary thrift client is not straightforward. Consider the following example:

  class AsyncThriftClient[A, B, X](
    requestConsumer: (A, AsyncMethodCallback[X]) => Unit,
    callbackResultConverter: X => B) extends Function1[A, Future[B]] {

    private class Callback(p: Promise[B])
      extends AsyncMethodCallback[X] {

      def onComplete(x: X): Unit = {
        try {
          val result = callbackResultConverter(x)
          println("from server: " + result)
          p.success(result)
        } catch {
          case e: Exception => p.failure(e)
        }
      }

      def onError(e: Exception): Unit = {
        p.failure(e)
      }
    }

    def apply(request: A): Future[B] = {
      val p = Promise[B]
      requestConsumer(request, new Callback(p))
      p.future
    }
  }

  def main(args: Array[String]) {
    try {
      val ex = Executors.newSingleThreadExecutor
      implicit val ec = ExecutionContext.fromExecutor(ex)

      val client = new ServiceStatus.AsyncClient(
        new TBinaryProtocol.Factory,
        new TAsyncClientManager,
        new TNonblockingSocket("localhost", 9090))
      val fun = new AsyncThriftClient[ //
      StatusRequest, StatusResponse, ServiceStatus.AsyncClient.status_call](
        client.status(_, _),
        _.getResult)

      val request = new StatusRequest("say hi")
      val fut = fun(request)
      fut.onSuccess { case r => println(s"succ $r") }
      fut.onFailure { case e => println(s"erro $e") }
      Thread.sleep(1000)
      ex.shutdown()
    } catch {
      case e: Exception => e.printStackTrace()
    }
  }

This seems like a reasonable first attempt, but note the type parameter X which is bound to ServiceStatus.AsyncClient.status_call. It seems like I shouldn't need to supply this because it's not important for any of the method signatures in AsyncThriftClient. What I really need is to say that there should be "some type" X such that the following constructor parameters agree, which sounds a lot like existential types. The resulting invocation site would look like the following:

      val client = new ServiceStatus.AsyncClient(
        new TBinaryProtocol.Factory,
        new TAsyncClientManager,
        new TNonblockingSocket("localhost", 9090))
      val fun = new AsyncThriftClient[StatusRequest, StatusResponse](
        client.status(_, _),
        _.getResult)

and the compiler would figure out that there is a suitable X that lets client.status(_, _) and _.getResult match up. Is there a way to achieve this? (By the way, a follow up task is to encapsulate the instantiation of client, which would probably require a similar technique).

Upvotes: 2

Views: 177

Answers (1)

Alex Cruise
Alex Cruise

Reputation: 7979

I would wrap the whole thing up in an abstract API proxy thing, and leave the specification of the intermediate Thrift type, and the implementation of the type translation functions, to the concrete implementation. Something like this:

trait AsyncThriftAPI[A,B] {
  protected type X // Intermediate Thrift type; not for use outside API implementations

  // Implementor must specify these. 
  protected def sendRequest(in: A, callback: AsyncMethodCallback[X]): Unit
  protected def convertResult(intermediate: X): B

  // Note that up here, we never use the client directly, 
  // so let's not needlessly couple this API proxy pattern 
  // to too many transport dependencies

  // final because of "must be abstract or final" dogma :)
  final def apply(request: A): Future[B] = {
    val p = Promise[B]
    sendRequest(request, new Callback(p))
    p.future
  }

  private class Callback(p: Promise[B]) extends AsyncMethodCallback[X] {
    def onComplete(x: X): Unit = {
      try {
        val result = convertResult(x)
        println("from server: " + result)
        p.success(result)
      } catch {
        case e: Exception => p.failure(e)
      }
    }

    def onError(e: Exception): Unit = {
      p.failure(e)
    }
  }
}

Now to implement it:

final class StatusAPI(implicit val      transport: TNonblockingTransport,
                               val  clientManager: TAsyncClientManager,
                               val protocolFactory: TProtocolFactory) 
    extends AsyncThriftAPI[StatusRequest, StatusResponse] 
{
  protected type X = ServiceStatus.AsyncClient.status_call

  // Lazy so that we don't bother to spin it up until someone actually calls the API
  lazy val client = new ServiceStatus.AsyncClient(protocolFactory,
                                                  clientManager,
                                                  transport)

  protected def sendRequest(in: A, callback: AsyncMethodCallback[X]): Unit = client.status(in, callback)

  protected def convertResult(intermediate: X) = intermediate.getResult
}

At the call site:

// Have these in scope somewhere near the root of whichever component
// needs to connect to a variety of Thrift servers
implicit val protoFactory  = new TBinaryProtocol.Factory
implicit val clientManager = new TAsyncClientManager
implicit val transport     = new TNonblockingSocket("localhost", 9090))

val statusApi = new StatusAPI()

statusApi(new StatusRequest(...)) // returns Future[StatusResponse]

I haven't attempted to compile this, please let me know if there are any mistakes.

If it were me I'd probably want to bundle a bunch of different, related API calls into one API proxy, so that might call for an additional layer of abstraction. :)

Upvotes: 3

Related Questions