Reputation: 773
I am trying to write a class which will query a set of data out of a database every x seconds.
The issue is that this observable remembers it's previous state and that state is used to determine what is actually sent on the observable.
For Example I want an
IObservable<SomeEntity> AddedEntities { get; private set; }
IObservable<SomeEntity> ChangedEntities { get; private set; }
IObservable<SomeEntity> DeletedEntities { get; private set; }
My issue is that I have been reading http://www.introtorx.com/ and they say that using Subject is a bad idea. Instead I should be using the Observable.Create methods.
My issue is I only want to query the database once and then send the relevant information back to the correct observable but I dont want each subscribe to my observable start it's own timer and query the database every 5 seconds per subscription to each IObservable.
I have played around with making just one observable which returns a new model with the changed as a set:
IObservable<EntityChangeSet> Changes {get; private set;}
public class EntityChangeSet
{
Public IEnumerable<SomeEntity> Added {get; set;}
Public IEnumerable<SomeEntity> Changed {get; set;}
Public IEnumerable<SomeEntity> Deleted {get; set;}
}
And I am open to any solution like this as well.
What I have so far is:
public class IntervalChangeReader
{
// My state between ticks.
private IEnumerable<SomeEntity> knowEntities;
// Reads the data from the db and uses knowEntities to determine adds, changes,
// and deletes which are exposed through properties.
private DbReaderAndChangeChecker checker;
IDisposeable timerCancel;
public IntervalChangeReader(DbReaderAndChangeChecker checker)
{
this.checker = checker;
}
public IObservable<EntityChangeSet> Changes { get; private set; }
public Start(int seconds)
{
this.timerCancel = Observable.Interval(new TimeSpan.FromSeconds(seconds)).Subscribe(
x =>
{
var results = this.checker.Refresh(this.knownEntities);
// Update this.knownEntities with results.
// Inform produce a value on the observable ????
// I could call .OnNext If I had a subject exposed for my Observable.
}
}
public Stop()
{
this.timerCancel.Dispose();
// Complete on all subscriptions?
// If I were using subjects I could just call .OnComplete here.
}
}
How do I create the Observable without using subjects and how to I pass results through it? I want to do this all with only one timer and not a timer per subscription to my observables.
Upvotes: 0
Views: 522
Reputation: 773
I think I have found a solution to my issue.
public class IntervalChangeReader
{
// My state between ticks.
private IEnumerable<SomeEntity> knowEntities;
// Reads the data from the db and uses knowEntities to determine adds, changes,
// and deletes which are exposed through properties.
private DbReaderAndChangeChecker checker;
private EntityChangeSet lastChangeResult;
public IntervalChangeReader(DbReaderAndChangeChecker checker)
{
this.checker = checker;
// I like to use Do instead of just combining this all into Select to make the side effect obvious.
this.Changes = Observable.Interval(TimeSpan.FromSeconds(seconds), scheduler).Do(
x =>
{
var changes = DbReaderAndChangeChecker.Refresh(this.knownXRefs);
// Make changes to knownXRefs which is my state.
this.knownXRefs.AddRange(changes.Added);
this.knownXRefs.RemoveAll(y => changes.Removed.Any(z => z.Id == y.Id));
this.knownXRefs.RemoveAll(y => changes.Changed.Any(z => z.Id == y.Id));
this.knownXRefs.AddRange(changes.Changed);
lastChangeResult = changes;
}).Select(x => this.lastChangeResult);
}
public IObservable<EntityChangeSet> Changes { get; private set; }
}
Upvotes: 0
Reputation: 16894
I believe what you're after is the ConnectableObservable
methods...play around with this (note, on phone, so my syntax is likely off a bit)
public void Start(int seconds)
{
// Every tick, generate a "changeset"
var changeCentral =
from tick in Observable.Interval(TimeSpan.FromSeconds(seconds))
let results = this.checker.Refresh(this.knowEntities)
select new EntityChangeSet(results);
// Publish means one subscription for all "connecting" subscribers
// RefCount means so long as one subscriber is subscribed, the subscription remains alive
var connector = changeCentral.Publish().RefCount();
Changes = connector;
}
Upvotes: 1