Reputation: 404
I am developing an event-sourced Electric Vehicle Charging Station Management System, which is connected to several Charging Stations. In this domain, I've come up with an aggregate for the Charging Station, which includes the internal state of the Charging Station(whether it is network-connected, if a car is charging using the station's connectors).
The station notifies me about its state through messages defined in a standardized protocol:
And my server can send commands to this station:
I've developed an Aggregate for this Charging Station. It contains the internal entities of its connector, whether it's charging or not, if it has a problem in the power system, ...
And the Aggregate, which its memory representation resides in the server that I control, not in the Charging Station itself, has a StationClient
service, which is responsible for sending these commands to the physical Charging Station(pseudocode):
class StationAggregate {
stationClient: StationClient
URL: string
connector: Connector[]
unlock(connectorId) {
if this.connectors.find(connectorId).isAvailableToBeUnlocked() {
return ErrorConnectorNotAvailable
}
error = this.stationClient.sendRemoteStartTransaction(this.URL, connectorId)
if error {
return ErrorStationRejectedUnlock
}
this.applyEvents([
StationUnlockedEvent(connectorId, now())
])
return Ok
}
receiveHeartbeat(timestamp) {
this.applyEvents([
StationSentHeartbeat(timestamp)
])
return Ok
}
}
I am using a optimistic concurrency, which means that, I load the Aggregate from a list of events, and I store the current version of the Aggregate in its memory representation: StationAggregate in version #2032, when a command is successfully processed and event(s) applied, it would the in version #2033, for example. In that way, I can put a unique constraint on the (StationID, Version) tuple on my persistence layer, and guarantee that only one event is persisted.
If by any chance, occurs a receival of a Heartbeat message, and the receival of a Unlock command. In both threads, they would load the StationAggregate and would be both in version X, in the case of the Heartbeat receival, there would be no side-effects, but in the case of the Unlock command, there would be a side-effect that tells the physical Charging Station to be unlocked. However as I'm using optimistic concurrency, that StationUnlocked
event could be rejected from the persistence layer. I don't know how I could handle that, as I can't retry the command, because it its inherently not idempotent(as the physical Station would reject the second request)
I don't know if I'm modelling something wrong, or if it's really a hard domain to model.
Upvotes: 0
Views: 1002
Reputation: 19640
I am not sure I fully understand the problem, but the idea of optimistic concurrency is to prevent writes in case of a race condition. Versions are used to ensure that your write operation has the version that is +1 from the version you've got from the database before executing the command.
So, in case there's a parallel write that won and you got the wrong version exception back from the event store, you retry the command execution entirely, meaning you read the stream again and by doing so you get the latest state with the new version. Then, you give the command to the aggregate, which decides if it makes sense to perform the operation or not.
The issue is not particularly related to Event Sourcing, it is just as relevant for any persistence and it is resolved in the same way.
Event Sourcing could bring you additional benefits since you know what happened. Imagine that by accident you got the Unlock
command twice. When you got the "wrong version" back from the store, you can read the last event and decide if the command has already been executed. It can be done logically (there's no need to unlock if it's already unlocked, by the same customer), technically (put the command id to the event metadata and compare), or both ways.
When handling duplicate commands, it makes sense to ensure a decent level of idempotence of the command handling, ignore the duplicate and return OK instead of failing to the user's face.
Another observation that I can deduce from the very limited amount of information about the domain, is that heartbeats are telemetry and locking and unlocking are business. I don't think it makes a lot of sense to combine those two distinctly different things in one domain object.
Update, following the discussion in comments:
What you got with sending the command to the station at the same time as producing the event, is the variation of two-phase commit. Since it's not executed in a transaction, any of the two operations could fail and lead the system to an inconsistent state. You either don't know if the station got the command to unlock itself if the command failed to send, or you don't know that it's unlocked if the event persistence failed. You only got as far as the second operation, but the first case could happen too.
There are quite a few ways to solve it.
First, solving it entirely technical. With MassTransit, it's quite easy to fix using the Outbox. It will not send any outgoing messages until the consumer of the original message is fully completed its work. Therefore, if the consumer of the Unlock
command fails to persist the event, the command will not be sent. Then, the retry filter would engage and the whole operation would be executed again and you already get out of the race condition, so the operation would be properly completed.
But it won't solve the issue when your command to the physical station fails to send (I reckon it is an edge case).
This issue can also be easily solved and here Event Sourcing is helpful. You'd need to convert sending the command to the station from the original (user-driven) command consumer to the subscriber. You subscribe to the event stream of StationUnlocked
event and let the subscriber send commands to the station. With that, you would only send commands to the station if the event was persisted and you can retry sending the command as many times as you'd need.
Finally, you can solve it in a more meaningful way and change the semantics. I already mentioned that heartbeats are telemetry messages. I could expect the station also to respond to lock and unlock commands, telling you if it actually did what you asked.
You can use the station telemetry to create a representation of the physical station, which is not a part of the aggregate. In fact, it's more like an ACL to the physical world, represented as a read model.
When you have such a mirror of the physical station on your side, when you execute the Unlock
command in your domain, you can engage a domain server to consult with the current station state and make a decision. If you find out that the station is already unlocked and the session id matches (yes, I remember our previous discussion :)) - you return OK and safely ignore the command. If it's locked - you proceed. If it's unlocked and the session id doesn't match - it's obviously an error and you need to do something else.
In this last option, you would clearly separate telemetry processing from the business so you won't have heartbeats impact your domain model, so you really won't have the versioning issue. You also would always have a place to look at to understand what is the current state of the physical station.
Upvotes: 2