Martin Tarjányi
Martin Tarjányi

Reputation: 9997

Chain calls inifnitely

I have a service which needs an input and returns an output:

Item callService(Item input);

The input is always the previous element emitted by an observable and the output of the service call should be the next emitted element by the observable. And this sequence should be repeated until an item fulfills a certain condition.

Actually, I have the leaf of a tree and I send the leaf to a service which tells me what is its parent. And I go on with this until I get to the root.

My first solution:

I recursively call a flatMap function. I don't like this solution because of recursion:

Observable.just(Item.of("start"))
          .map(Result::of)
          .flatMap(App::fallbackUntilIsRoot)
          .blockingSingle();

private static Observable<Result> fallbackUntilIsRoot(Result previousResult)
{
    if (previousResult.isRoot())
    {
        return Observable.just(previousResult);
    } else
    {
        return Service.call(previousResult.getItem())
                      .flatMap(App::fallbackUntilIsRoot); // recursion
    }
}

My second solution:

I maintain an external queue, and basically I feed the observable with this queue. Still not looking very good.

Queue<Item> todoList = new LinkedList<>(Collections.singleton(Item.of("start")));

Result result = Observable.range(1, Integer.MAX_VALUE) //max tries
                          .concatMap(attempt -> Service.call(todoList))
                          .takeUntil(Result::isEmpty)
                          .takeLast(2)
                          .firstElement()
                          .blockingGet();

public class Service
{
    private static final Queue<Item> itemsProvidedByService = new LinkedList<>(
            Arrays.asList(Item.of("first"), Item.of("second"), Item.of("third"), Item.of("fourth"), Item.of("root")));

    public static Observable<Result> call(Queue<Item> todoList)
    {
        Item previousItem = todoList.poll();

        Item newItem = itemsProvidedByService.poll(); // we could pass previousItem to a real service here

        if (newItem == null)
        {
            return Observable.just(Result.empty());
        }

        todoList.add(newItem);

        return Observable.just(Result.of(newItem));
    }
}

Is there an Rx way of doing such chaining?

Upvotes: 1

Views: 120

Answers (2)

Martin Tarj&#225;nyi
Martin Tarj&#225;nyi

Reputation: 9997

Meanwhile, I found the name for my problem. It's called corecursion. RxJava 2 has a clean (side-effect-less) solution for this problem.

private static void solutionWithGenerate()
{
    Result startElement = Result.of(Item.of("start"));

    Result result = Observable.generate(() -> startElement, App::generate)
                              .startWith(startElement)
                              .lastElement()
                              .blockingGet();

    System.out.println(result);
}

private static Result generate(Result previousResult, Emitter<Result> emitter)
{
    Result newResult = Service.callDirectly(previousResult.getItem());

    if (newResult.isEmpty())
    {
        emitter.onComplete();
    } else
    {
        emitter.onNext(newResult);
    }

    return newResult;
}

Can be done in RxJava 1, too. A bit more verbose: Observable.Generate in RxJava?

Upvotes: 2

yosriz
yosriz

Reputation: 10267

you can do it by single side effect field that keeps the current iterated item.
combining repeat() with takeUntil() will make the while loop together with fromCallable() for making the iterator mutable in the loop (i.e. using just() will repeat the initial value forever):

it = Result.of(Item.of("start"));
Observable.fromCallable(() -> it)
        .map(result -> {
            it = service.call(result);
            return result;
        })
        .repeat()
        .takeUntil(Item -> it.isRoot())
        .subscribe(item -> .....);

Upvotes: 1

Related Questions