szantogab
szantogab

Reputation: 45

RxJava Observable to Completable, how to avoid toBlocking()

I am currently using RxJava on Android with Kotlin, but I have a problem and I can't solve without using toBlocking().

I have method in employee service which returns an Observable>:

fun all(): Observable<List<Employee>>

This is all and good since this Observable emits the new list of employees whenever an employee changes. But I'd like to generate a PDF file from the employees, which obviously doesn't need to run everytime an employee changes. Also, I'd like to return a Completable object from my PDF generator method. I want to add a header to my PDF, and then iterate through the employees and calculate the wage of each employee, which also returns an Observable, and this is the place where I am using toBlocking right now. My current approach is this:

private fun generatePdf(outputStream: OutputStream): Completable {
    return employeeService.all().map { employees ->
        try {
                addHeaderToPDF()
                for (i in employees) {
                    val calculated = employeeService.calculateWage(i.id).toBlocking().first()
                    // Print calculated to PDF....
                }
                addFooterToPDF()
                return @map Completable.complete()
            }
            catch (e: Exception) {
                return @map Completable.error(e)
            }
        }.first().toCompletable()

Is there any way to make this code a little cleaner using RxJava?

Thanks in advance!

Upvotes: 4

Views: 3519

Answers (2)

Eugen Pechanec
Eugen Pechanec

Reputation: 38223

Disclaimer: This answer is a work in progress.


Basic premise: If you have blocking in the stream, you're doing it wrong.

Note: No state must leave the observable lambda.

Step 1: Stream the whole data set

The input is a stream of employees. For each employee you need to get one wage. Let's make it into one stream.

/**
 * @param employeesObservable
 * Stream of employees we're interested in.
 * @param wageProvider
 * Transformation function which takes an employee and returns a [Single] of their wage.
 * @return
 * Observable stream spitting individual [Pair]s of employees and their wages.
 */
fun getEmployeesAndWagesObservable(
        employeesObservable: Observable<Employee>,
        wageProvider: Function<Employee, Single<Int>>
): Observable<Pair<Employee, Int>>? {
    val employeesAndWagesObservable: Observable<Pair<Employee, Int>>

    // Each Employee from the original stream will be converted
    // to a Single<Pair<Employee, Int>> via flatMapSingle operator.
    // Remember, we need a stream and Single is a stream.
    employeesAndWagesObservable = employeesObservable.flatMapSingle { employee ->
        // We need to get a source of wage value for current employee.
        // That source emits a single Int or errors.
        val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee)

        // Once the wage from said source is loaded...
        val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->
            // ... construct a Pair<Employee, Int>
            employee to wage
        }

        // This code is not executed now. It will be executed for each Employee
        // after the original Observable<Employee> starts spitting out items.
        // After subscribing to the resulting observable.
        return@flatMapSingle employeeAndWageSingle
    }

    return employeesAndWagesObservable
}

What's going to happen when you subscribe:

  1. Take an employee from source.
  2. Fetch wage of an employee.
  3. Spit out a pair of employee and their wage.

This repeats until employeesObservable signals onComplete or something fails with onError.

Used operators:

  • flatMapSingle: Converts an actual value into a new Single stream of some transformed value.
  • map: Converts an actual value into some other actual value (no nested streams).

Hee's how you'd hook it up to your code:

fun doStuff() {
    val employeesObservable = employeeService.all()
    val wageProvider = Function<Employee, Single<Int>> { employee ->
        // Don't listen to changes. Take first wage and use that.
        employeeService.calculateWage(employee.id).firstOrError()
    }

    val employeesAndWagesObservable = 
            getEmployeesAndWagesObservable(employeesObservable, wageProvider)

    // Subscribe...
}

Used operators:

  • first: Take the first item from observable and turn it into a Single stream.
  • timeout: A good idea would be to .timeout the wage if you're getting it over network.

Next steps

Option 1: End here

Don't subscribe, call

val blockingIterable = employeesAndWagesObservable.blockingIterable()
blockingIterable.forEach { ... }

and process each item in a synchronous fashion. Sit back, figure out next steps, watch presentations, read examples.

Option 2: Add layers

  1. .map each of these Pair<Employee, Int> to some abstract PDF building block.
  2. Turn your header and footer printers to Observables via Observable.fromCallable { ... }, have them return PDF building blocks too.
  3. Merge all of these in a sequential manner via Observable.concat(headerObs, employeeDataObs, footerObs)
  4. .subscribe to this result and start writing the PDF building blocks to a PDF writer.
  5. TODO:
    • Figure out a way to initialize the PDF writer lazily on subscription (not before building the stream),
    • Delete output on error,
    • Close output stream on complete or on error.

Upvotes: 3

szantogab
szantogab

Reputation: 45

I came up with this:

    return employeeService.all().first()
            .doOnSubscribe { addHeaderToPDF() }
            .flatMapIterable { it }
            .flatMap { employeeService.calculateWage(it.id).first() }
            .doOnNext { printEmployeeWage(it) }
            .doOnCompleted { addFooterToPDF }
            .toCompletable()

Is this how it is supposed to be done? :)

Upvotes: 0

Related Questions