Reputation: 8311
I'm writing some kind of middleware HTTP proxy with cache. The workflow is:
My interfaces have Publisher<ByteBuffer>
stream for remote resource, cache which accepts Publisher<ByteBuffer>
to save, and clients' connection which accepts Publisher<ByteBuffer>
as a response:
// remote resource
interface Resource {
Publisher<ByteBuffer> fetch();
}
// cache
interface Cache {
Completable save(Publisher<ByteBuffer> data);
}
// clien response connection
interface Connection {
Completable send(Publisher<ByteBuffer> data);
}
My problem is that I need to lazy save this stream of byte buffers to cache when sending the response to the client, so the client should be responsible for requesting ByteByffer
chunks from remote resource, not cache.
I tried to use Publisher::cache
method, but it's not a good choice for me, because it keeps all received data in memory, it's not acceptable, since cached data may be few GB of size.
As a workaround, I created Subject
filled by next items received from Resource
:
private final Cache cache;
private final Connection out;
Completable proxy(Resource res) {
Subject<ByteBuffer> mirror = PublishSUbject.create();
return Completable.mergeArray(
out.send(res.fetch().doOnNext(mirror::onNext),
cache.save(mirror.toFlowable(BackpressureStrategy.BUFFER))
);
}
Is it possible to reuse same Publisher
without caching items in memory, and where only one subscriber will be responsible for requesting items from publisher?
Upvotes: 4
Views: 1289
Reputation: 1029
I might be missing something (added comment about my version of the Publisher
interface being different).
But.. here's how I would do something like this conceptually.
I'm going to simplify the interfaces to deal with Integers
:
// remote resource
interface Resource {
ConnectableObservable<Integer> fetch();
}
// cache
interface Cache {
Completable save(Integer data);
}
// client response connection
interface Connection {
Completable send(Integer data);
}
I'd use Observable::publish
to create a ConnectableObservable
and establish two subscriptions:
@Test
public void testProxy()
{
// Override schedulers:
TestScheduler s = new TestScheduler();
RxJavaPlugins.setIoSchedulerHandler(
scheduler -> s );
RxJavaPlugins.setComputationSchedulerHandler(
scheduler -> s );
// Mock interfaces:
Resource resource = () -> Observable.range( 1, 100 )
.publish();
Cache cache = data -> Completable.fromObservable( Observable.just( data )
.delay( 100, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Caching %d", data ))));
Connection connection = data -> Completable.fromObservable( Observable.just( data )
.delay( 500, TimeUnit.MILLISECONDS )
.doOnNext( __ -> System.out.println( String.format( "Sending %d", data ))));
// Subscribe to resource:
ConnectableObservable<Integer> observable = resource.fetch();
observable
.observeOn( Schedulers.io() )
.concatMapCompletable( data -> connection.send( data ))
.subscribe();
observable
.observeOn( Schedulers.computation() )
.concatMapCompletable( data -> cache.save( data ))
.subscribe();
observable.connect();
// Simulate passage of time:
s.advanceTimeBy( 10, TimeUnit.SECONDS );
}
Output:
Caching 1
Caching 2
Caching 3
Caching 4
Sending 1
Caching 5
Caching 6
Caching 7
Caching 8
Caching 9
Sending 2
Caching 10
. . .
Update
Based on your comments, it sounds like respecting backpressure is important in your case.
Let's say you have a Publisher
somewhere that honors backpressure, you can transform it into a Flowable
as follows:
Flowable<T> flowable = Flowable.fromPublisher( publisher );
Once you have a Flowable
you can allow for multiple subscribers without worrying about each subscriber having to request values from the Publisher
(or either subscriber from missing any events while establishing the subscriptions). You do that by calling flowable.publish()
to create a ConnectableFlowable
.
ConnectableFlowable<T> flowable = Flowable.fromPublisher( publisher ).publish();
out.send(flowable); // calls flowable.subscribe()
cache.save(flowable); // calls flowable.subscribe()
flowable.connect(); // begins emitting values
Upvotes: 1