Reputation: 388
Below is the following code:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetObservable()
.Subscribe(observer);
}
}
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
}
public class ClientConsumingProgram
{
class FooObserver : IObserver<FooData>
{
public void OnNext(FooData value)
{
//Client consuming without interruption
}
//.. on error.. onCompleted
}
public static void Main()
{
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
}
}
I want to implement following:
When transport service is disconnected (socket closed from server), I want application to retry for few times, but foo service first needs to call Connect
on _transportService
and then once State
is connected, call GetObservable.
Desired result is OnNext
on FooObserver
keeps on ticking on client side, if _transportService
is connect again before max retry, and once it's exceeds max error OnError should be fired.
Can someone point me to the right direction for implementing this?
UPDATE
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
public interface ITransportService
{
IObservable<ConnectionState> GetConnectionStateObservable();
bool Connect();
IObservable<FooData> GetObservable();
}
public class FooData
{
public int Id { get; set; }
public string Msg { get; set; }
}
public enum ConnectionState
{
Open,
Close
}
public class FooMockTransportService : ITransportService
{
public ConnectionState State { get; set; }
private BehaviorSubject<ConnectionState> _connectionSubject = new BehaviorSubject<ConnectionState>(ConnectionState.Close);
private bool _shouldDisconnect;
public FooMockTransportService()
{
_shouldDisconnect = true;
}
public bool Connect()
{
State = ConnectionState.Open;
_connectionSubject.OnNext(ConnectionState.Open);
return true;
}
public IObservable<ConnectionState> GetConnectionStateObservable()
{
return _connectionSubject.AsObservable();
}
public IObservable<FooData> GetObservable()
{
return Observable.Create<FooData>(
o=>
{
TaskPoolScheduler.Default.Schedule(() =>
{
o.OnNext(new FooData { Id = 1, Msg = "First" });
o.OnNext(new FooData { Id = 2, Msg = "Sec" });
//Simulate disconnection, ony once
if(_shouldDisconnect)
{
_shouldDisconnect = false;
State = ConnectionState.Close;
o.OnError(new Exception("Disconnected"));
_connectionSubject.OnNext(ConnectionState.Close);
}
o.OnNext(new FooData { Id = 3, Msg = "Third" });
o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
});
return () => { };
});
}
}
public class Program
{
class FooObserver : IObserver<FooData>
{
public void OnCompleted()
{
throw new NotImplementedException();
}
public void OnError(Exception error)
{
Console.WriteLine(error);
}
public void OnNext(FooData value)
{
Console.WriteLine(value.Id);
}
}
public static void Main()
{
var transportService = new FooMockTransportService();
var fooService = new FooService(transportService);
var fooObserver = new FooObserver();
var disposable = fooService.Subscribe(fooObserver);
Console.Read();
}
}
Code is compliable and also contains suggestions for Shlomo. Current output:
1
2
System.Exception: Disconnected
Desired output, on disconnect it should catch and retry every 1 sec as an example to see whether it's connected or not:
1
2
1
2
3
4
Upvotes: 2
Views: 713
Reputation: 117029
You cannot write Rx code that effectively executes like this:
o.OnNext(new FooData { Id = 1, Msg = "First" });
o.OnNext(new FooData { Id = 2, Msg = "Sec" });
o.OnError(new Exception("Disconnected"));
o.OnNext(new FooData { Id = 3, Msg = "Third" });
o.OnNext(new FooData { Id = 4, Msg = "Fourth" });
The contract for an observable is a stream of zero or more values that ends with either an error or a complete signal. They cannot emit further values.
Now, I appreciate that this code might be for testing purposes, but if you create an impossible stream you'll end up writing impossible code.
The correct approach is to use .Switch()
, as per Shlomo's answer. If there's some delay in connection then the GetConnectionStateObservable
should only return a value when it is connected. Shlomo's answer remains correct.
Upvotes: 3
Reputation: 14350
If you control ITransportService
, I would recommend adding a property:
public interface ITransportService
{
ConnectionState State { get; }
bool Connect();
IObservable<FooData> GetObservable();
IObservable<ConnectionState> GetConnectionStateObservable();
}
Once you can get the states in an observable fashion, producing the observable becomes easier:
public class FooService
{
private ITransportService _transportService;
public FooService(ITransportService transportService)
{
_transportService = transportService;
_transportService.Connect();
}
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
}
If you don't control ITransportService
, I would recommend creating an interface that inherits from it where you can add a similar property.
As an aside, I would recommend you ditch FooObserver
, you almost never need to fashion your own observer. Expose the observable, and calling a Subscribe
overload on the Observable normally does the trick.
I can't test any of this though: there's no clarity as to what the retry logic should be like, what the return value for Connect
means, or what the ConnectionState
class is, and the code doesn't compile. You should try to fashion your question as a mcve.
UPDATE:
The following handles the test code as expected:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
.Catch(Observable.Never<FooData>())
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch()
.Subscribe(observer);
}
Only change from the original posted code is the additional .Catch(Observable.Never<FooData>())
. As written, this code will run forever. I hope you have some way to terminate the observable external to what's posted.
Upvotes: 3
Reputation: 966
Elaborating my comment:
As Shlomo already shown in his answer how you can take advantage of connection state observable, what I guess you want is to subscribe it again when disconnect happen.
For that use Observable.Defer
return Observable.Defer(() => your final observable)
and now on disconnect if you want to subscribe again use Retry
return Observable.Defer(() => your final observable).Retry(3)
but you might need to delay your retries, either linearly or with exponential back-off strategy, for this use DelaySubscription
return Observable.Defer(() => your_final_observable.DelaySubscription(strategy)).Retry(3)
here's the final code, with retry every second:
public IDisposable Subscribe(IObserver<FooData> observer)
{
return Observable.Defer(() => {
return _transportService.GetConnectionStateObservable()
.Select(cs => cs == ConnectionState.Open)
.DistinctUntilChanged()
.Select(isOpen => isOpen
? _transportService.GetObservable() //if open, return observable
: Observable.Start(() => _transportService.Connect()) //if not open, call connect and wait for connection to open
.IgnoreElements()
.Select(_ => default(FooData))
.Concat(Observable.Never<FooData>())
)
.Switch().DelaySubscription(TimeSpan.FromSeconds(1));
})
.Retry(2)
.Subscribe(observer);
}
Some caveats to remember:
This DelaySubscription will delay first call also, so if it's a problem, create a count variable and only when count > 0 use DelaySubscription else use normal observable.
Upvotes: 2