Reputation: 32936
Event store seems to be inconsistent when the catch up subscriptions are reconnecting. Based on a threads in the google group, and experimentation, it seems that catch up subscriptions will reconnect automatically if the SubscriptionDropReason is ConnectionClosed
.
However if you subsequently try and stop those connections and specify a timeout then they never seem to actually stop and always time out.
I used the code from this post but I believe this is misleading as in my experimentation the reason multiple events are received in this case is because event store automatically resubscribes to CatchupSubscriptions when event store comes back online, so as long as you don't resubscribe when the reason is ConnectionClosed then you won't receive the events multiple times. If I modify this code to add the timeout to the .Stop(timeout)
method then it just hangs and throws.
When I get the call to live processing started event after an automatic reconnection the EventStoreCatchUpSubscription
object has internal state which indicates that _isDropped=1
, which also seems strange to me, as its not dropped, its actually processing events.
So my question is how should I handle the case where eventstore has reconnected automatically and I want to drop the connection and wait for a timeout?
Upvotes: 3
Views: 1686
Reputation: 254
"subscriptionDropped" event is a good place to handle connectivity issue. There is still an open issue in event store project.
https://github.com/EventStore/EventStore/issues/929
https://github.com/EventStore/EventStore/issues/1127
eventStoreConnection.SubscribeToAllFrom(lastCheckpoint, catchUpSubscriptionSettings,
eventAppeared(projection),
liveProcessingStarted(projection),subscriptionDropped(projection),userCredentials );
private Action<EventStoreCatchUpSubscription, SubscriptionDropReason, Exception> subscriptionDropped(Projection projection)
=> async (eventStoreCatchUpSubscription, subscriptionDropReason, exception) =>
{
eventStoreCatchUpSubscription.Stop();
switch (subscriptionDropReason)
{
case SubscriptionDropReason.UserInitiated:
Console.WriteLine($"{projection} projection stopped by user.");
break;
case SubscriptionDropReason.SubscribingError:
case SubscriptionDropReason.ServerError:
case SubscriptionDropReason.ConnectionClosed:
case SubscriptionDropReason.CatchUpError:
case SubscriptionDropReason.ProcessingQueueOverflow:
case SubscriptionDropReason.EventHandlerException:
Console.WriteLine($"{projection} projection stopped because of a transient error ({subscriptionDropReason}). ");
Console.WriteLine($"Exception Detail: {exception}");
Console.WriteLine("Attempting to restart...");
// Re-build your subscription in here
Task.Run(() => StartProjection(projection));
break;
default:
Console.WriteLine("Your subscription gg");
Console.WriteLine($"Exception Detail: {exception}");
break;
}
};
Upvotes: 3