Reputation: 13779
An application I am working on makes heavy use of asynchronous processing, and I am looking for a better way to organize the code.
The external input to the system is received on a servlet. The raw data collected by this servlet is deposited in to a queue. A thread pool runs against this queue, and parses the raw data into a structured record which is then deposited in to one of a set of N queues. The queue is chosen such that all records of the same kind go to the same queue. These N queues are serviced by a single thread each, which collects records of the same kind into a set. Every minute a scheduled tasks wakes up and writes into a file all records collected in the previous minute for each kind.
Currently, this code is organized using a bunch of queues, thread pools, and ever-running runnables, which makes the logic hard to follow. I’d like to refactor this code into something where the data-flow described above is more explicit. I am looking for tools and approaches to achieve that.
Upvotes: 0
Views: 483
Reputation: 20826
Here is an example of RxJava according to your description. Hope it would help you.
public class TestServlet extends HttpServlet {
private static final PublishSubject<String> o = PublishSubject.<String>create();
public static Observable<String> getServletObservable() {
return o.observeOn(Schedulers.computation());
}
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp)
throws ServletException, IOException {
synchronized (TestServlet.class) {
o.onNext("value");
}
}
}
class Foo {
public static void main(String[] args) {
TestServlet.getServletObservable().map(new Func1<String, String>() {
@Override
public String call(String t1) {
// do something
return null;
}
}).groupBy(new Func1<String, String>() {
@Override
public String call(String t1) {
// do something
return null;
}
}).subscribe(new Observer<GroupedObservable<String, String>>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(GroupedObservable<String, String> group) {
group.observeOn(Schedulers.io()).subscribe(new Observer<String>() {
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String t) {
// store t
}
});
}
});
}
}
Upvotes: 2