Reputation: 6452
I have built an API for my app that sits behind a gateway with request throttling. Before I built the API my app coordinated requests itself and thus could start many many requests in milliseconds to synchronize data for the app across the 9 providers being used to fetch data. Now, this logic has been pushed up into my API adapter layer I need to think about how I can control the number of requests per second to avoid hitting my own rate limits. Increasing the rate limit is not an option since it would require a tier bump in the gateway provider that I am not willing to pay for.
In a bit to upskill in this strong movement within the Java community, I have opted to use RxJava, together with Retrofit and Retrolamba for the API SDK I have built. This has been largely successful and is operating in live without issue.
Now, my app allows users to save 'spots' that when synchronized retrieve local weather, tide and swell conditions for that area. Each spot uses 4 API resources to get a complete data-set, specifically;
/luna/locations/xtide/{id} - Luna Event detail (read: tide times)
/solar/locations/xtide/{id} - Solar Event detail (read: sunrise/sunset)
/water/locations/{provider}/{id}{?daysData} - Water Event detail (read: swell measures)
/meteo/wwo/weather{?query,daysData} - Meteo Event detail (read: weather data)
The app permits any number of spots, n meaning that with the current code I have 4n requests per spot. For example if I have 10 spots saved and attempt to sync all - I will cause 4*10 = 40 API requests to be fired in around 0.75s!
I am wanting to use Rx to simplify the process of self-throtteling my API requests. Here is a (hopefully accurate) marble chart of what I am wanting to achieve;
Figure 1: Marble chart showing desired stream composition
The SynchronisationService.java
code looks a bit like this;
Observable.zip(
Observable.from(spots),
Observable.interval(SYNC_TICK, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.subscribeOn(scheduler)
.observeOn(scheduler)
.unsubscribeOn(scheduler)
.flatMap(spot -> syncTidePosition.get().buildObservable(spot))
.subscribe(spotAndTideEvent -> new TideEventSubscriber(
lunaEventService,
synchronisationIntentProvider.get(),
spotAndTideEvent.spot,
String.format(
getString(string.tide_error_message),
spotAndTideEvent.spot.getTidePosition()
),
errorHandlerService,
localBroadcastManager)
);
...and the "buildObservable" call looks like this;
Observable<SpotAndTideEventTuple> buildObservable(final Spot spot) {
return Observable.zip(
Observable.just(spot),
lunaEventsProvider
.listTideTimes(
spot.getTideOperator(),
Integer.toString(spot.getTidePosition())
),
SpotAndTideEventTuple::new
);
}
...and the lunaEventsProvider.listTideTimes(...)
method looks like;
public Observable<List<TideEvent>> listTideTimes(@NonNull final LunaProvider provider,
@NonNull final String identifier) {
return getRetrofitServiceImpl(LunaEventsProviderDefinition.class)
.listTideTimes(provider, identifier)
.map(TideEventsTemplate::buildModels);
}
As an Rx amateur I've read much of the documentation to get this far but upon encountering an error with the code I'm at a loss as to where to go next. Either the subscription isn't causing the emissions to start (as with the snippets shown) or if I tweak things a little I get an unhelpful low-level NPE (rx.Scheduler
).
Where should I go from here? Am I on the right track with using Rx for the scenario described? Any help appreciated.
Upvotes: 5
Views: 290
Reputation: 6452
Somewhat embarassingly the NPE errors I was seeing were nothing to do with Rx, rather the scheduler I had specified to run the operation on was being injected into the android.app.Service
but owing to a slight 'misconfiguration' (omitting the @Inject
annotation!) the scheduler
variable was null.
Small comfort in knowing that the reason why I missed this was owing to the fact that my Scheduler
injection is also qualified meaning that it 'looked' the same as my other declarations at the top of the class;
@Inject @IoScheduler Scheduler scheduler;
@Inject LocalBroadcastManager localBroadcastManager;
@Inject NotificationManager notificationManager;
@Inject SharedPreferences sharedPrefs;
Well, I had fun building those marbles diagrams and pulling apart my understandings on Rx. The current call now coordinates all 4 of the API requests and looks like this;
Observable.zip(
Observable.from(spots),
Observable.interval(SYNC_TICK, TimeUnit.MILLISECONDS),
(obs, timer) -> obs)
.subscribeOn(scheduler)
.observeOn(scheduler)
.unsubscribeOn(scheduler)
.flatMap(this::buildObservable)
.subscribe(
new EventSubscriber(
lunaEventService,
solarService,
swellService,
conditionsService,
synchronisationIntentProvider.get(),
errorHandlerService,
localBroadcastManager,
TRENDING_LENGTH_DAYS
)
);
This is part way through the refactor of this service so I expect it to change a bit more, especially when it comes to getting the tests underneath green. Glad I stuck with it, using Rx literally removes ~50 to ~100 lines of code each time I learn a function!
Upvotes: 1