Reputation: 51
I am fairly new to rxJava. in my api response, I get information about total number of pages and current page number like:
"pages": 22,
"page": 1,
i am using Retrofit to do api calls in data layer, my api service is like:
@GET("/story")
Observable <StoryCollectionEntity> storyCollection(
@Query("feed_ids") String feed_items,
@Query("page") int page);
then:
public Observable<StoryCollectionEntity> storyCollection() {
return mUserApi.storyCollection(items,page);
}
i did the subscription in domain layer like this:
public void execute(Subscriber UseCaseSubscriber) {
this.subscription = this.buildUseCaseObservable()
.subscribeOn(Schedulers.from(threadExecutor))
.observeOn(postExecutionThread.getScheduler())
.subscribe(UseCaseSubscriber);
}
@Override public Observable buildUseCaseObservable() {
return this.userRepository.stories();
}
i am figuring out on how can i make this observer reacts to recyclerView scrolling event by emitting the result of next page. i.e page 2 to second scrolling event and page 3 on 3rd scrolling ...etc
Upvotes: 2
Views: 4813
Reputation: 3740
I'll paste below the solution I'm using - which also uses Retrofit. In the example below, I'm using Retrofit to create an Observable of all public repositories from the GitHub API.
I believe the trick here is that I am recursing on the Observable and concatenating with another Observable is essentially a trigger.
This trigger can emit a variety of ways (perhaps when the user scrolls to the bottom of the page) however in this example, it immediately emits once a full page has been handled.
/***
* This is a helper wrapper Subscriber that helps you lazily defer
* continuous paging of a result set from some API.
* Through the use of a {@link Subject}, it helps notify the original {@link Observable}
* when to perform an additional fetch.
* The notification is sent when a certain count of items has been reached.
* Generally this count represents the page.
* @param <T> The event type
*/
@Data
public class PagingSubscriber<T> extends Subscriber<T> {
private final Subject<Void,Void> nextPageTrigger = PublishSubject.create();
private final long pageSize;
private long count = 0;
private final Subscriber<T> delegate;
/***
* Creates the {@link PagingSubscriber}
* @param pageSize
* @param delegate
*/
public PagingSubscriber(long pageSize, Subscriber<T> delegate) {
this.pageSize = pageSize;
this.delegate = delegate;
}
public Observable<Void> getNextPageTrigger() {
return nextPageTrigger;
}
@Override
public void onStart() {
delegate.onStart();
}
@Override
public void onCompleted() {
delegate.onCompleted();
}
@Override
public void onError(Throwable e) {
delegate.onError(e);
}
@Override
public void onNext(T t) {
count+=1;
if (count == pageSize) {
nextPageTrigger.onNext(null);
count= 0;
}
delegate.onNext(t);
}
}
@Data
public class GitHubRepositoryApplication {
private final GitHubService gitHubService;
@Inject
public GitHubRepositoryApplication(GitHubService githubService) {
this.gitHubService = githubService;
}
public Observable<GitHubRepository> printAllRepositories(Observable<Void> nextPageTrigger) {
return printRepositoryPages(GitHubService.FIRST_PAGE, nextPageTrigger)
.flatMapIterable(r -> r.body());
}
public Observable<Response<List<GitHubRepository>>> printRepositoryPages(String startingPage, Observable<Void> nextPageTrigger) {
return gitHubService.listRepos(startingPage)
.concatMap(response -> {
Optional<String> nextPage = Optional.ofNullable(response.headers().get(HttpHeaders.LINK))
.flatMap(header -> GitHubServiceUtils.getNextToken(header));
if (!nextPage.isPresent()) {
return Observable.just(response);
}
return Observable.just(response)
.concatWith(nextPageTrigger.limit(1).ignoreElements().cast(Response.class))
.concatWith(printRepositoryPages(nextPage.get(), nextPageTrigger));
});
}
public static void main(String[] args) {
Injector injector = Guice.createInjector(new GitHubModule());
GitHubRepositoryApplication app = injector.getInstance(GitHubRepositoryApplication.class);
Subscriber<GitHubRepository> subscriber = new Subscriber<GitHubRepository>() {
private final Logger log = LoggerFactory.getLogger(getClass());
@Override
public void onStart() {
log.debug("STARTING");
request(1l);//we need to begin the request
}
@Override
public void onCompleted() {
log.debug("COMPLETED");
}
@Override
public void onError(Throwable e) {
log.error("ERROR",e);
}
@Override
public void onNext(GitHubRepository gitHubRepository) {
log.debug("{}",gitHubRepository);
request(1l);//we need to make sure we have asked for another element
}
};
PagingSubscriber<GitHubRepository> pagingSubscriber = new PagingSubscriber<>(GitHubService.PAGE_SIZE, subscriber);
//In order for the JVM not to quit out, we make sure we turn our Observable to
//a BlockingObservable, so that all of it will finish.
Observable<GitHubRepository> observable =
app.printAllRepositories(pagingSubscriber.getNextPageTrigger());
observable.toBlocking().subscribe(pagingSubscriber);
}
}
Upvotes: 5