Tair
Tair

Reputation: 3809

Implement a recurring job repeating not more than once per second

Suppose we have something like this:

while (true) {
  val job = Future { doSomething(); 1 }
  val timeout = Future { Thread.sleep(1000); 2 }

  val both = for (j <- job; t <- timeout) {
    println("Done")
  }
  Await.result(both)
}

What is the idiomatic solution for this using rx-java/scala?

UPDATE: a little more clarification, if not obvious from code.

Let tsn and ten be timestamp of start and end of a doSomething() job respectively.

Then the next job should be scheduled at tsn+1 = max ( ten , tsn + 1 second ).

Upvotes: 0

Views: 357

Answers (2)

akarnokd
akarnokd

Reputation: 69997

If I understand the problem correctly, you need to do recursive scheduling (as it seems you don't emit any value from the jobs). Here is an example how to do this with RxJava's Scheduler.Worker:

public class RecurringJob {
    static Subscription runJob(Runnable run) {
        Scheduler.Worker w = Schedulers.newThread().createWorker();
        MultipleAssignmentSubscription mas = new MultipleAssignmentSubscription();
        Action0 action = new Action0() {
            @Override
            public void call() {
                try {
                    run.run();
                    mas.set(w.schedule(this, 1, TimeUnit.SECONDS));
                } catch (Throwable t) {
                    t.printStackTrace();
                    w.unsubscribe();
                }
            }
        };
        mas.set(w.schedule(action, 1, TimeUnit.SECONDS));
        return mas;
    }
    public static void main(String[] args) throws InterruptedException {
        Subscription s = runJob(() -> System.out.println("Job"));
        Thread.sleep(10000);
        s.unsubscribe();
        System.out.println("Job stopped");
        Thread.sleep(3000);
        System.out.println("Done.");
    }
}

Upvotes: 0

Matias Saarinen
Matias Saarinen

Reputation: 606

After going through all the possibilities that RxScala and Observables offer I have to say that there may be a fundamental problem here: the subscriber of an observable should not control the emission of new values. The observable is a source of events and the subscriber is a passive consumer. Otherwise for example one subscriber could affect the output the observable emits to other subscribers.

If you still want to use observables this is the best solution I came up with. It zips the ready observable and the timer together so that it emits new event when both the timer and the job are done.

def run(job: () => Unit) {

  val ready = Observable.create{ observer =>
    for(
      j <- future {job(); 1};
    ) observer.onNext(); 
  }

  Observable.timer(1 seconds).zip(ready).subscribe{ value =>
    run();
  }

}

run(doSomenthing); 

Upvotes: 2

Related Questions