Reputation: 439
I'm working in a application than convert one list of objects in other objects, for this, I use Observable of Rx Java.
I have a two methods, one is a adapter (Jersey facade) that have an asyn service and this method consume from another service one Observable.
I need consume a Observable and process each item when this es complete I nedd create one list of all item processed and return a new Observable where E is a list of each item processed.
To process each item I use a flatMap operator, but I dont know how to create a new Observer that have one type diferente by example a List of all each processed by the flatmap operator.
Any Idea?
Thanks
Update:
This code, process the each Element and have return the other Observable, but I dont know if this is well done.
@Override
public Observable<ArrayList> getGeoJson2() {
WKTReader2 reader2 = new WKTReader2();
WKBReader wkbReader = new WKBReader();
ArrayList featureCollection = new ArrayList();
Subject<ArrayList,ArrayList> subject = PublishSubject.create();
manzanaRepository.getManzanas().map(new Func1<Manzana, SimpleFeature>() {
@Override
public SimpleFeature call(Manzana manzana) {
try {
SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null);
}catch (Exception e){
System.out.println(e.getMessage());
return null;
}
}
}).subscribe(new Subscriber<SimpleFeature>() {
@Override
public void onCompleted() {
subject.onNext(featureCollection);
subject.onCompleted();
}
@Override
public void onError(Throwable throwable) {
subject.onError(throwable);
}
@Override
public void onNext(SimpleFeature simpleFeature) {
featureCollection.add(simpleFeature);
}
});
return subject;
}
And this code is the one who uses the Observable returnee:
@GET
@Produces(MediaType.APPLICATION_JSON)
@Path("/async/geom")
public void asyncGetGeom(@Suspended final AsyncResponse asyncResponse) {
Observable<ArrayList> features = service.getGeoJson2();
features.subscribe(new Observer<ArrayList>() {
@Override
public void onCompleted() {
System.out.println("Se completo la accion!!!");
}
@Override
public void onError(Throwable throwable) {
System.out.println(throwable.getMessage());
}
@Override
public void onNext(ArrayList features) {
asyncResponse.resume(features);
}
});
}
The method onNext() is never Called!!!
Thanks
Upvotes: 2
Views: 671
Reputation: 4077
the problem with your code is that you are using PublishSubject which completes before you start using it
if you change it to BehaviorSubject, it will work
however, better (and much shorter) to implement it as follows -
manzanaRepository.getManzanas().map(/**/).collect(/*create array*/, /*add next item*/)
Upvotes: 0
Reputation: 1074
Try replacing your getGeoJson2()
with this:
@Override
public Observable<List<SimpleFeature>> getGeoJson2() {
return manzanaRepository.getManzanas()
.map(new Func1<Manzana, SimpleFeature>() {
@Override
public SimpleFeature call(Manzana manzana) {
try {
SimpleFeatureType TYPE = DataUtilities.createType("", "geom,name:String");
return SimpleFeatureBuilder.build( TYPE, new Object[]{ null, "name1"}, null));
} catch (Exception e) {
System.out.println(e.getMessage());
return null;
}
}
})
.filter(new Func1<SimpleFeature, Boolean>() {
@Override
public Boolean call(SimpleFeature sf) {
return sf != null;
}
})
.toList();
}
Explanation: toList()
operator is used, which waits for the onCompleted
from source observable and then emits all the items emitted by the source observable as a list.
Upvotes: 1