Reputation: 45
I am currently using RxJava on Android with Kotlin, but I have a problem and I can't solve without using toBlocking().
I have method in employee service which returns an Observable>:
fun all(): Observable<List<Employee>>
This is all and good since this Observable emits the new list of employees whenever an employee changes. But I'd like to generate a PDF file from the employees, which obviously doesn't need to run everytime an employee changes. Also, I'd like to return a Completable
object from my PDF generator method. I want to add a header to my PDF, and then iterate through the employees and calculate the wage of each employee, which also returns an Observable, and this is the place where I am using toBlocking right now. My current approach is this:
private fun generatePdf(outputStream: OutputStream): Completable {
return employeeService.all().map { employees ->
try {
addHeaderToPDF()
for (i in employees) {
val calculated = employeeService.calculateWage(i.id).toBlocking().first()
// Print calculated to PDF....
}
addFooterToPDF()
return @map Completable.complete()
}
catch (e: Exception) {
return @map Completable.error(e)
}
}.first().toCompletable()
Is there any way to make this code a little cleaner using RxJava?
Thanks in advance!
Upvotes: 4
Views: 3519
Reputation: 38223
Disclaimer: This answer is a work in progress.
Basic premise: If you have blocking
in the stream, you're doing it wrong.
Note: No state must leave the observable lambda.
The input is a stream of employees. For each employee you need to get one wage. Let's make it into one stream.
/**
* @param employeesObservable
* Stream of employees we're interested in.
* @param wageProvider
* Transformation function which takes an employee and returns a [Single] of their wage.
* @return
* Observable stream spitting individual [Pair]s of employees and their wages.
*/
fun getEmployeesAndWagesObservable(
employeesObservable: Observable<Employee>,
wageProvider: Function<Employee, Single<Int>>
): Observable<Pair<Employee, Int>>? {
val employeesAndWagesObservable: Observable<Pair<Employee, Int>>
// Each Employee from the original stream will be converted
// to a Single<Pair<Employee, Int>> via flatMapSingle operator.
// Remember, we need a stream and Single is a stream.
employeesAndWagesObservable = employeesObservable.flatMapSingle { employee ->
// We need to get a source of wage value for current employee.
// That source emits a single Int or errors.
val wageForEmployeeSingle: Single<Int> = wageProvider.apply(employee)
// Once the wage from said source is loaded...
val employeeAndWageSingle: Single<Pair<Employee, Int> = wageForEmployeeSingle.map { wage ->
// ... construct a Pair<Employee, Int>
employee to wage
}
// This code is not executed now. It will be executed for each Employee
// after the original Observable<Employee> starts spitting out items.
// After subscribing to the resulting observable.
return@flatMapSingle employeeAndWageSingle
}
return employeesAndWagesObservable
}
What's going to happen when you subscribe:
This repeats until employeesObservable
signals onComplete
or something fails with onError
.
Used operators:
Hee's how you'd hook it up to your code:
fun doStuff() {
val employeesObservable = employeeService.all()
val wageProvider = Function<Employee, Single<Int>> { employee ->
// Don't listen to changes. Take first wage and use that.
employeeService.calculateWage(employee.id).firstOrError()
}
val employeesAndWagesObservable =
getEmployeesAndWagesObservable(employeesObservable, wageProvider)
// Subscribe...
}
Used operators:
.timeout
the wage if you're getting it over network.Don't subscribe, call
val blockingIterable = employeesAndWagesObservable.blockingIterable()
blockingIterable.forEach { ... }
and process each item in a synchronous fashion. Sit back, figure out next steps, watch presentations, read examples.
.map
each of these Pair<Employee, Int>
to some abstract PDF building block.Observable.fromCallable { ... }
, have them return PDF building blocks too.Observable.concat(headerObs, employeeDataObs, footerObs)
.subscribe
to this result and start writing the PDF building blocks to a PDF writer.Upvotes: 3
Reputation: 45
I came up with this:
return employeeService.all().first()
.doOnSubscribe { addHeaderToPDF() }
.flatMapIterable { it }
.flatMap { employeeService.calculateWage(it.id).first() }
.doOnNext { printEmployeeWage(it) }
.doOnCompleted { addFooterToPDF }
.toCompletable()
Is this how it is supposed to be done? :)
Upvotes: 0