Reputation: 5439
I'm implementing CQRS pattern with Event sourcing, I'm using NServiceBus, NEventStore and NES(Connects between NSB and NEventStore).
My application will check a web service regularly for any file to be downloaded and processed. when the a file is found, a command (DownloadFile) is sent to the bus, and received by FileCommandHandler which creates a new aggregate root (File) and handle the message.
Now inside the (File aggregate root) I have to check that the content of the file doesn't match with any other file content (Since the web service guarantee that only file name is unique, and the content may be duplicated with different name), by hashing it and comparing with the list of hashed contents.
The question is where I have to save the list of hash codes? is it allowed to query the read model?
public class File : AggregateBase
{
public File(DownloadFile cmd, IFileService fileDownloadService, IClaimSerializerService serializerService, IBus bus)
: this()
{
// code to download the file content, deserialize it, and publish an event.
}
}
public class FileCommandHandler : IHandleMessages<DownloadFile>, IHandleMessages<ExtractFile>
{
public void Handle(DownloadFile command)
{
//for example, is it possible to do this (honestly, I feel it is not, since read model should always considered stale !)
var file = readModelContext.GetFileByHashCode (Hash(command.FileContent));
if (file != null)
throw new Exception ("File content matched with another already downloaded file");
// Since there is no way to query the event source for file content like:
// eventSourceRepository.Find<File>(c=>c.HashCode == Hash(command.FileContent));
}
}
Upvotes: 4
Views: 417
Reputation: 3414
Seems like you're looking for deduplication.
Your command side is where you want things to be consistent. Queries will always leave you open to race conditions. So, instead of running a query, I'd reverse the logic and actually write the hash into a database table (any db with ACID guarantees). If this write is successful, process the file. If the write of the hash fails, skip processing.
There's no point putting this logic into a handler, because retrying the message in case of failure (ie storing the hash multiple times) will not make it succeed. You'd also end up with messages for duplicate files in the error q.
A good place for the deduplication logic is likely inside your web service client. Some pseudo logic
Some example deduplication code in NServiceBus gateway here
Edit:
Looking at their code, I actually think the session.Get<DeduplicationMessage>
is unnecessary. session.Save(gatewayMessage);
should be enough and is the consistency boundary.
Doing a query would make sense only if the rate of failure is high, meaning you have a lot of duplicate content files. If 99%+ of inserts succeed, the duplicates can indeed be treated as exceptions.
Upvotes: 1
Reputation: 2893
When you have a true uniqueness-constraint in your domain, you can make the uniqueness-tester a domain service, whose implementation is part of the infrastructure -- similar to a repository, whose interface is part of the domain and whose implementation is part of the infrastructure. For the implementation, you can then use an in-memory hash or a database that is updated/queried as needed.
Upvotes: 0
Reputation: 2990
This depends on a lot of things ... throughput being one of them. But since you're approaching this problem in a "pull based" fashion anyway (you're querying a webservice to poll for work (downloading and analysing a file)), you could make this whole process serial without having to worry about collisions. Now that might not give the desired rate at which you want to be handling "the work", but more importantly ... have you measured? Let's sidestep that for a minute and assume that serial isn't going to work. How many files are we talking about? A few 100, 1000, ... millions? Depending on that hashes might fit into memory and could be rebuilt if/when the process should come down. There might also be an opportunity to partition your problem along the axis of time or context. Every file since the beginning of dawn or just today, or maybe this month's worth of files? Really, I think you should dig deeper in your problem space. Apart from that, this feels like an awkward problem to solve using event sourcing, but YMMV.
Upvotes: 0