Reputation: 12209
I'm new to NEventStore and event sourcing in general. In a project I want to use NEventStore for persisting events generated by our aggregates, but I have some problem to correctly handle concurrency.
How can I write to the same stream using an optimistic lock?
Let's say I have 2 instances of the same aggregate that are loaded at revision 1 from 2 different threads. Then the first thread call command A and the second thread call command B . Using an optimistic lock one of the aggregate should fail with a concurrency exception.
I thought to use the maxRevision to open the stream from the point that the aggregate is loaded, but seems that the CommitChanges never fail, also if I pass an old revision.
What I'm missing? Is optimistic lock possible/correct when using NEventStore/Event Sourcing?
Here is the code that I have used to reproduce the problem:
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (var scope = new TransactionScope())
using (store = WireupEventStore())
{
Client1(revision: 0);
Client2(revision: 0);
scope.Complete();
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.UsingInMemoryPersistence()
.Build();
}
private static void Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
I expect client 2 to fail because I open the stream with an old revision.
UPDATE 26/08/2013: I have tested the same code using Sql server and seems to work as expected.
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (store = WireupEventStore())
{
OpenOrCreateStream();
AppendToStream_Client1(revision: 1);
AppendToStream_Client2(revision: 1); // throws an error
// AppendToStream_Client2(revision: 2); // works
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.LogToOutputWindow()
.UsingInMemoryPersistence()
.UsingSqlPersistence("EventStore") // Connection string is in app.config
.WithDialect(new MsSqlDialect())
.InitializeStorageEngine()
.UsingJsonSerialization()
.Build();
}
private static void OpenOrCreateStream()
{
using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
{
var @event = new SomeDomainEvent { Value = "Initial event." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 2." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
So back to my question: to enable optimistic lock should I use revision when opening the stream? There are other possible implementations or guidelines?
thanks
Upvotes: 3
Views: 2054
Reputation:
Firstly, the in-memory persistence implementation, whose primary purpose is testing, is not transaction aware. In your original example, client 2 will simply append it's event to the stream. Try running the above with a persistence store that supports transactions (SQL & Raven, but not Mongo).
Secondly, specifying the min/max revision when opening a stream is used for different purposes:
ConcurrencyException
will occur.Support for most of this would be encapsulated in a domain framework. See AggregateBase and EventStoreRepository in CommonDomain
Thirdly, and most importantly, updating >1 stream in a single transaction is a code smell. If you are doing DDD/ES, the stream represents a single aggregate root which, by definition, is a consistency boundary. Creating/updating more than one AR in a transaction breaks this. NEventStore's transaction support was (reluctantly) added so it could work with other tools, i.e. transactionally read a command from MSMQ/NServiceBus/whatever and handle it, or, transactionally dispatch a commit message to a queue and mark it as such. Personally, I'd would recommend that you do your best to avoid 2PC.
Upvotes: 6