Jim C
Jim C

Reputation: 4405

RxJava: what is the recommended emitter for saving and retrieving data from NoSql DB asynchronously (i.e. reactive approach)

Context: it is my first time working with RxJava.

When coding with RxJava to (1) select documents from NoSql DB and (2) insert to NoSql (ex. MongoDb), what is the the recommended emmitter taking in account a Reactive Stack?

For instance, should I normally prefer use Flowable for reading and Single for saving?

This code works perrfectly to save a message received from a Kafka Topic to MongoDb but I am wondering if io.reactivex.Single is really the best way to accomplish it.

import com.mongodb.client.result.InsertOneResult
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Body
import io.reactivex.Observable
import io.reactivex.Single
import javax.inject.Inject
import io.reactivex.functions.Function

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {

    @Inject
    lateinit var mongoClient: MongoClient

    @Topic("debit")
    fun receive(@KafkaKey key: String, name: String) {

        save(key.toInt(), name)

    }

    private fun save( id: Int?,name: String?) {
        val debitMessage =  DebitMessage(id, name)
        Single
                .fromPublisher(getCollection().insertOne(debitMessage))
                .map<DebitMessage>(Function<InsertOneResult, DebitMessage> { debitMessage })
                .subscribe()
    }

    private fun getCollection(): MongoCollection<DebitMessage?> {
        return mongoClient
                .getDatabase("mydb")
                .getCollection("mycollection", DebitMessage::class.java)
    }
}

I came from Spring Data which is a bit straighforward a CRUD in Reactive world and, for not relevent reasons for this question, I will not use Spring and I am looking for best practices while writing/reading data on Reactive/No Blocking/BackPressure world.

Upvotes: 0

Views: 346

Answers (1)

Carson Holzheimer
Carson Holzheimer

Reputation: 2963

Your code looks fine. A Single makes sense for saving, since you will only get one result back. Flowable makes sense for reading, but really the choice is up to you and your application. Do you want to listen for database changes via Change Streams? Then you will have to use Flowable so you can react to multiple updates in the stream. It may be good practice to use Flowable, even if currently you don't listen to multiple updates, but you think you might in the future.

If you're sure you only ever want to deal with 1 event, use Single. It will save you some effort in your application code dealing with the possibility of multiple events.

Upvotes: 1

Related Questions