jihedMaster
jihedMaster

Reputation: 331

Vertx - Is it possible to combine RxJava and Hibernate

I wnted to know if its possible to use this:

https://github.com/vert-x3/vertx-rx

with

https://github.com/vert-x3/vertx-jdbc-client

and you add Hibernate for the named queries / DAO access.

How to make Hibernate non blocking I/O?

Upvotes: 3

Views: 2720

Answers (1)

Клаус Шварц
Клаус Шварц

Reputation: 3268

Recently I faced exactly the same problem and wrote that library: https://github.com/eraga/rxjpa2

Here is an example how you can utilize it with vertx.

Assuming you store User entities in DB which you accessing with JPA2 Hibernate. Then you will have UsersDAOServiceImpl like that:

package jpa.rxjava.vertx.sample

import io.vertx.core.AsyncResult
import io.vertx.core.Handler
import io.vertx.reactivex.CompletableHelper
import io.vertx.reactivex.SingleHelper
import data.User // That's your data class
import net.eraga.rxjpa2.*
import javax.persistence.EntityManager
import javax.persistence.EntityManagerFactory

class UsersDAOServiceImpl(
        emf: EntityManagerFactory,
        readyHandler: Handler<AsyncResult<UsersDAOService>>
) : UsersDAOService {

    private lateinit var entityManager: EntityManager

    init {
        emf.rxCreateEntityManager()
                .map {
                    entityManager = it
                    this
                }
                .subscribe(SingleHelper.toObserver(readyHandler))

    }


    override fun fetchByLogin(login: String, resultHandler: Handler<AsyncResult<User>>) {
        entityManager.createQuery("SELECT i From User i where login = :login", User::class.java)
                .setParameter("login", login)
                .rxSingleResult<User>()
                .subscribe(SingleServiceHelper.toObserver(resultHandler))
    }

    override fun fetchAll(resultHandler: Handler<AsyncResult<List<User>>>) {
        entityManager.createQuery("SELECT i From User i order by modified desc", User::class.java)
                .rxResultList()
                .subscribe(SingleServiceHelper.toObserver(resultHandler))
    }

    override fun fetchById(id: Int, resultHandler: Handler<AsyncResult<User>>) {
        entityManager.rxFind(User::class.java, id)
                .subscribe(SingleServiceHelper.toObserver(resultHandler))
    }

    override fun create(user: User, resultHandler: Handler<AsyncResult<Void>>) {
        entityManager
                .rxPersist(user)
                .subscribe(CompletableHelper.toObserver(resultHandler))
    }

    override fun save(user: User, resultHandler: Handler<AsyncResult<Void>>) {
        entityManager
                .rxMerge(user)
                .toCompletable()
                .subscribe(CompletableHelper.toObserver(resultHandler))
    }

    override fun delete(id: Int, resultHandler: Handler<AsyncResult<Void>>) {
        val user = User().apply { this.id = id }
        entityManager
                .rxRemove(user)
                .subscribe(CompletableHelper.toObserver(resultHandler))
    }
}

Notice SingleServiceHelper class that helps to translate Singles to SingleObservers:

package jpa.rxjava.vertx.sample

import com.fasterxml.jackson.core.type.TypeReference
import io.reactivex.Single
import io.reactivex.SingleObserver
import io.reactivex.SingleTransformer
import io.reactivex.annotations.NonNull
import io.reactivex.disposables.Disposable
import io.vertx.core.AsyncResult
import io.vertx.core.Future
import io.vertx.core.Handler
import io.vertx.core.buffer.Buffer
import io.vertx.reactivex.core.json.SingleUnmarshaller
import io.vertx.serviceproxy.ServiceException
import java.util.concurrent.atomic.AtomicBoolean
import java.util.function.Function


object SingleServiceHelper {
    /**
     * Adapts an Vert.x `Handler<AsyncResult<T>>` to an RxJava2 [SingleObserver].
     *
     *
     * The returned observer can be subscribed to an [Single.subscribe].
     *
     * @param handler the handler to adapt
     * @return the observer
     */
    fun <T> toObserver(handler: Handler<AsyncResult<T>>): SingleObserver<T> {
        val completed = AtomicBoolean()
        return object : SingleObserver<T> {
            override fun onSubscribe(@NonNull d: Disposable) {}
            override fun onSuccess(@NonNull item: T) {
                if (completed.compareAndSet(false, true)) {
                    handler.handle(Future.succeededFuture(item))
                }
            }

            override fun onError(error: Throwable) {
                if (completed.compareAndSet(false, true)) {
                    val ex = ServiceException(-1,error.message).apply {
                        initCause(error)
                    }
                    handler.handle(Future.failedFuture(ex))
                }
            }
        }
    }

    fun <T> unmarshaller(mappedType: Class<T>): SingleTransformer<Buffer, T> {
        return SingleUnmarshaller(Function.identity(), mappedType)
    }

    fun <T> unmarshaller(mappedTypeRef: TypeReference<T>): SingleTransformer<Buffer, T> {
        return SingleUnmarshaller(Function.identity(), mappedTypeRef)
    }
}

That's how you instantiate EntityManagerFactory using RxPersistence when you start you App verticle:

package jpa.rxjava.vertx.sample

import io.reactivex.plugins.RxJavaPlugins
import io.vertx.core.DeploymentOptions
import io.vertx.core.Future
import io.vertx.core.VertxOptions
import io.vertx.core.json.JsonObject
import io.vertx.reactivex.core.AbstractVerticle
import io.vertx.reactivex.core.RxHelper
import io.vertx.reactivex.core.Vertx
import net.eraga.rxjpa2.RxPersistence
import org.slf4j.LoggerFactory
import javax.persistence.EntityManagerFactory

class App : AbstractVerticle() {
    private val log = LoggerFactory.getLogger(this.javaClass)

    @Throws(Exception::class)
    override fun start(startFuture: Future<Void>) {

        RxPersistence
                .createEntityManagerFactory("initiatives")
                .flatMap {
                    log.info("Done createEntityManagerFactory {}", it)
                    App.entityManagerFactory = it
                    vertx.rxDeployVerticle(UsersVerticle::class.java.name)
                }.flatMap {
                    log.info("Started {}", it)

                    vertx.rxDeployVerticle(HttpServerVerticle::class.java.name,
                            DeploymentOptions().setInstances(2))
                }.subscribe(
                        {
                            log.info("Started {}", it)
                            startFuture.complete()
                        },
                        {
                            log.error("Fail {}", it.message)
                            startFuture.fail(it)
                        }
                )
    }

    companion object {
        lateinit var entityManagerFactory: EntityManagerFactory

        @JvmStatic
        fun main(args: Array<String>) {
            System.setProperty("org.jboss.logging.provider", "slf4j")
            System.setProperty("hazelcast.logging.type", "slf4j")


            Vertx.clusteredVertx(VertxOptions().setClustered(true)) { cluster ->
                if (cluster.succeeded()) {
                    val vertx = cluster.result()

                    RxJavaPlugins.setComputationSchedulerHandler({ s -> RxHelper.scheduler(vertx) })
                    RxJavaPlugins.setIoSchedulerHandler({ s -> RxHelper.blockingScheduler(vertx) })
                    RxJavaPlugins.setNewThreadSchedulerHandler({ s -> RxHelper.scheduler(vertx) })
                    RxJavaPlugins.lockdown()

                    vertx.deployVerticle(App::class.java.name, DeploymentOptions().setConfig(JsonObject().put("local", true)))
                }
            }
        }
    }
}

Upvotes: 2

Related Questions