Reputation: 531
As a caveat I'm a novice with Rx (2 weeks) and have been experimenting with using Rx, RxUI and Roland Pheasant's DynamicData.
I have a service that initially loads data from local persistence and then, upon some user (or system) instruction will contact the server (TriggerServer in the example) to get additional or replacement data. The solution I've come up with uses a Subject and I've come across many a site discussing the pros/cons of using them. Although I understand the basics of hot/cold it's all based on reading rather than real world.
So, using the below as a simplified version, is this 'right' way of going about this problem or is there something I haven't properly understood somewhere?
NB: I'm not sure how important it is, but the actual code is taken from a Xamarin.Forms app, that uses RxUI, the user input being a ReactiveCommand.
Example:
using DynamicData;
using System;
using System.Linq;
using System.Reactive;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Threading.Tasks;
public class MyService : IDisposable
{
private CompositeDisposable _cleanup;
private Subject<Unit> _serverSubject = new Subject<Unit>();
public MyService()
{
var data = Initialise().Publish();
AllData = data.AsObservableCache();
_cleanup = new CompositeDisposable(AllData, data.Connect());
}
public IObservableCache<MyData, Guid> AllData { get; }
public void TriggerServer()
{
// This is what I'm not sure about...
_serverSubject.OnNext(Unit.Default);
}
private IObservable<IChangeSet<MyData, Guid>> Initialise()
{
return ObservableChangeSet.Create<MyData, Guid>(async cache =>
{
// inital load - is this okay?
cache.AddOrUpdate(await LoadLocalData());
// is this a valid way of doing this?
var sync = _serverSubject.Select(_ => GetDataFromServer())
.Subscribe(async task =>
{
var data = await task.ConfigureAwait(false);
cache.AddOrUpdate(data);
});
return new CompositeDisposable(sync);
}, d=> d.Id);
}
private IObservable<MyData> LoadLocalData()
{
return Observable.Timer(TimeSpan.FromSeconds(3)).Select(_ => new MyData("localdata"));
}
private async Task<MyData> GetDataFromServer()
{
await Task.Delay(2000).ConfigureAwait(true);
return new MyData("serverdata");
}
public void Dispose()
{
_cleanup?.Dispose();
}
}
public class MyData
{
public MyData(string value)
{
Value = value;
}
public Guid Id { get; } = Guid.NewGuid();
public string Value { get; set; }
}
And a simple Console app to run:
public static class TestProgram
{
public static void Main()
{
var service = new MyService();
service.AllData.Connect()
.Bind(out var myData)
.Subscribe(_=> Console.WriteLine("data in"), ()=> Console.WriteLine("COMPLETE"));
while (Continue())
{
Console.WriteLine("");
Console.WriteLine("");
Console.WriteLine($"Triggering Server Call, current data is: {string.Join(", ", myData.Select(x=> x.Value))}");
service.TriggerServer();
}
}
private static bool Continue()
{
Console.WriteLine("Press any key to call server, x to exit");
var key = Console.ReadKey();
return key.Key != ConsoleKey.X;
}
}
Upvotes: 0
Views: 1749
Reputation: 531
I'm putting what I've come to as my solution just in case there's others that find this while they're wandering the internets.
I ended up removing the Subjects all together and chaining together several SourceCache, so when one changed it pushed into the other and so on. I've removed some code for brevity:
public class MyService : IDisposable
{
private SourceCache<MyData, Guid> _localCache = new SourceCache<MyData, Guid>(x=> x.Id);
private SourceCache<MyData, Guid> _serverCache = new SourceCache<MyData, Guid>(x=> x.Id);
public MyService()
{
var localdata = _localCache.Connect();
var serverdata = _serverCache.Connect();
var alldata = localdata.Merge(serverdata);
AllData = alldata.AsObservableCache();
}
public IObservableCache<MyData, Guid> AllData { get; }
public IObservable<Unit> TriggerLocal()
{
return LoadLocalAsync().ToObservable();
}
public IObservable<Unit> TriggerServer()
{
return LoadServerAsync().ToObservable();
}
}
EDIT: I've changed this again to remove any chaining of caches - I just manage the one cache internally. Lesson is not to post too early.
Upvotes: 0
Reputation: 2946
Looks very good for first try with Rx
I would suggest few changes:
1) Remove the Initialize()
call from the constructor and make it a public method - helps a lot with unit tests and now you can await
it if you need to
public static void Main()
{
var service = new MyService();
service.Initialize();
2) Add Throttle
to you trigger - this fixes parallel calls to the server returning the same results
3) Don't do anything that can throw in Subscribe
, use Do
instead:
var sync = _serverSubject
.Throttle(Timespan.FromSeconds(0.5), RxApp.TaskPoolScheduler) // you can pass a scheduler via arguments, or use TestScheduler in unit tests to make time pass faster
.Do(async _ =>
{
var data = await GetDataFromServer().ConfigureAwait(false); // I just think this is more readable, your way was also correct
cache.AddOrUpdate(data);
})
// .Retry(); // or anything alese to handle failures
.Subscribe();
Upvotes: 1