user3139545
user3139545

Reputation: 7374

RxJava/RxKotlin split stream depending on subtype

I have a stream of ResponseMessage which can be of different subtypes. I would like to split the stream into streams where I can handle each type in its own stream.

My first try resulted in this which I can not see working out.

file.readLines()
        .toObservable()
        .map { mapper.readValue(it, ResponseMessage::class.java) }
        .groupBy { when(it) {
            is MarketChangeMessage -> it::class
            else -> it::class
        }}
        .map { it.????? } //How can possible this work?

My question is now: What is the idiomatic way to divide a stream into streams on one specific sub type?

Upvotes: 0

Views: 385

Answers (1)

ESala
ESala

Reputation: 7058

You could use the ofType operator:

ofType( ) — emit only those items from the source Observable that are of a particular class.

Example:

val messages = file.readLines()
    .toObservable()
    .map { mapper.readValue(it, ResponseMessage::class.java) }
    .share() // <-- or other multicasting operator

messages
    .ofType(MarketChangeMessage::class)
    .subscribe()

messages
    .ofType(Other::class)
    .subscribe()

Upvotes: 3

Related Questions