Reputation: 4405
Context: it is my first time working with RxJava.
When coding with RxJava to (1) select documents from NoSql DB and (2) insert to NoSql (ex. MongoDb), what is the the recommended emmitter taking in account a Reactive Stack?
For instance, should I normally prefer use Flowable for reading and Single for saving?
This code works perrfectly to save a message received from a Kafka Topic to MongoDb but I am wondering if io.reactivex.Single is really the best way to accomplish it.
import com.mongodb.client.result.InsertOneResult
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Body
import io.reactivex.Observable
import io.reactivex.Single
import javax.inject.Inject
import io.reactivex.functions.Function
@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {
@Inject
lateinit var mongoClient: MongoClient
@Topic("debit")
fun receive(@KafkaKey key: String, name: String) {
save(key.toInt(), name)
}
private fun save( id: Int?,name: String?) {
val debitMessage = DebitMessage(id, name)
Single
.fromPublisher(getCollection().insertOne(debitMessage))
.map<DebitMessage>(Function<InsertOneResult, DebitMessage> { debitMessage })
.subscribe()
}
private fun getCollection(): MongoCollection<DebitMessage?> {
return mongoClient
.getDatabase("mydb")
.getCollection("mycollection", DebitMessage::class.java)
}
}
I came from Spring Data which is a bit straighforward a CRUD in Reactive world and, for not relevent reasons for this question, I will not use Spring and I am looking for best practices while writing/reading data on Reactive/No Blocking/BackPressure world.
Upvotes: 0
Views: 346
Reputation: 2963
Your code looks fine. A Single
makes sense for saving, since you will only get one result back. Flowable
makes sense for reading, but really the choice is up to you and your application. Do you want to listen for database changes via Change Streams? Then you will have to use Flowable
so you can react to multiple updates in the stream. It may be good practice to use Flowable
, even if currently you don't listen to multiple updates, but you think you might in the future.
If you're sure you only ever want to deal with 1 event, use Single
. It will save you some effort in your application code dealing with the possibility of multiple events.
Upvotes: 1