Reputation: 330
We have noticed excessive logging from the TrackingEventProcessor
class when scaling up the micro-service to 2 replicas:
Our Axon setup:
TokenStore
, i.e. CouchBaseTokenStore
forSingleThreadedProcessing
We are seeing the following messages a lot:
Segment is owned by another node. Releasing thread to process another segment...
No Worker Launcher active. Using current thread to assign segments.
Using current Thread for last segment worker: TrackingSegmentWorker{processor=core_policy-view, segment=Segment[0/0]}
Fetched token: GapAwareTrackingToken{index=2369007, gaps=[]} for segment: Segment[0/0]
Here is an example for a single processing group called core_policy-view:
This is from our staging environment. We don't see the same behaviour in our Production environment that is running even more replicas of the same micro-service (5 instead of 2), albeit a previous version. The excessive logging is of concern, because it causes the following query to also be executed excessively against the event store:
SELECT min ( globalIndex ) FROM DomainEventEntry WHERE globalIndex > ?
Here is a graph of how it impacted the APMs when the micro-service was scaled up to 2 replicas:
By looking at the code in the TrackingEventProcessor
class I understand that the TrackingEventProcessor
is responsible for assigning segments to the running instance. If the segment is already claimed then a specific instance should not bother much any further. It seems that this is however not happening. What can also sometimes happen is that the segment/token ping-pongs between the two nodes.
At the moment I may not even be articulating the issue correctly and I'm also unsure about what questions to ask. I would appreciate it if anyone can shed some light on whether we are doing something wrong. I know the version of axon we are using is old, and we have it on the roadmap to upgrade, but for now we need to get the current changes of the system into Production so that we can move forward and start with the upgrade after that.
EDIT 1
Here are the methods in CouchBaseTokenStore
that are responsible for claiming the token:
@Override
public void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException {
// Consider tracking CAS to avoid re-read and harness the CAS optimistic concurrency better.
// Unfortunately, GapAwareTrackingToken is expected by RDBMS storage engines and can't be extended.
JsonDocument doc = readOrCreateDocument(processorName, segment);
if (GapAwareTrackingToken.class.isAssignableFrom(token.getClass())) {
writeGapAwareToken((GapAwareTrackingToken) token, doc);
} else {
writeTrackingToken(token, doc);
}
this.axonStateBucket.upsert(doc);
}
@Override
public TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException {
JsonDocument doc = readOrCreateDocument(processorName, segment);
String tokenClass = doc.content().getString(TOKEN_CLASS_FIELD);
if (tokenClass == null) {
return readGapAwareToken(doc);
} else {
return readTrackingToken(doc);
}
}
private JsonDocument readOrCreateDocument(String processorName, int segment) throws UnableToClaimTokenException {
String docId = getId(processorName, segment);
JsonDocument doc = this.axonStateBucket.get(docId);
if (doc == null) {
try {
doc = createDocument(processorName, segment);
} catch (DocumentAlreadyExistsException e) {
// Another instance beat us to it, read new token
// which will most likely not be claimable.
doc = this.axonStateBucket.get(docId);
}
}
claimToken(doc);
return doc;
}
private JsonDocument createDocument(String processorName, int segment) throws DocumentAlreadyExistsException {
JsonObject content = JsonObject.create()
.put(PROCESSOR_NAME_FIELD, processorName)
.put(SEGMENT_FIELD, segment)
.put(TYPE_FIELD, TOKEN_TYPE)
.put(CLAIM_EXPIRY_FIELD, formatInstant(Instant.now().plus(claimDuration)))
.put(OWNER_FIELD, nodeName);
JsonDocument doc = JsonDocument.create(getId(processorName, segment), content);
return this.axonStateBucket.insert(doc);
}
private void claimToken(JsonDocument document) throws UnableToClaimTokenException {
String originalOwner = document.content().getString(OWNER_FIELD);
Instant originalClaimExpiry = DateTimeUtils.parseInstant(document.content().getString(CLAIM_EXPIRY_FIELD));
document.content()
.put(CLAIM_EXPIRY_FIELD, formatInstant(Instant.now().plus(claimDuration)))
.put(OWNER_FIELD, nodeName);
if (nodeName.equals(originalOwner)) return;
if ((originalClaimExpiry).isAfter(clock.instant())) {
throw new UnableToClaimTokenException(String.format("Claim for owner %s is still valid.", originalOwner));
}
}
Upvotes: 1
Views: 770
Reputation: 330
I managed to fix the problem with the help of Allard (see comments on question). The fix was to also persist the token after it has been claimed in the fetch()
method. We also started making use of the replace()
method supplied by the Couchbase SDK instead of the upsert()
method, to better harness the CAS (Compare-and-Swap) optimistic concurency:
@Override
public void storeToken(TrackingToken token, String processorName, int segment) throws UnableToClaimTokenException {
JsonDocument doc = readOrCreateDocument(processorName, segment);
if (GapAwareTrackingToken.class.isAssignableFrom(token.getClass())) {
writeGapAwareToken((GapAwareTrackingToken) token, doc);
} else {
writeTrackingToken(token, doc);
}
axonStateBucket.replace(doc);
}
@Override
public TrackingToken fetchToken(String processorName, int segment) throws UnableToClaimTokenException {
JsonDocument doc = readOrCreateDocument(processorName, segment);
axonStateBucket.replace(doc); // readOrCreateDocument method claims, so we need to persist that
String tokenClass = doc.content().getString(TOKEN_CLASS_FIELD);
if (tokenClass == null) {
return readGapAwareToken(doc);
} else {
return readTrackingToken(doc);
}
}
The rest of the code remained the same as per the code block in the question.
Upvotes: 1