Reputation: 672
How can I make the operations of getting and setting a property from datastore, thread safe?
Currently, I have code which puts tasks in the queue and each task perform a task and then updates a property called of numberOfTasks which is of type int. It basically fetches the current value of this property and increments it.
However as tasks are executed in the queue, the final value is not coming to be correct because of the threading issue. Sometimes, two tasks tries to update the proeprty at the same time and hence sometime the increment isnt done.
Could anyone please help in getting this done correctly?
Datastore Property Getter Method:
private String doGet(String rowId) throws EntityNotFoundException {
Key egsKey = KeyFactory.createKey(DATASTORE_KIND, rowId);
Entity egsEntity = datastore.get(egsKey);
// schema changed from String to Text type. Transparently handle that here.
Object propertyValue = egsEntity.getProperty(PROPERTY_KEY);
if (propertyValue instanceof String) {
return (String) propertyValue;
}
Text text = (Text) propertyValue;
return text.getValue();
}
Datastore Property SETTER METHOD:
private void doPut(String rowId, List<String> list) {
Entity entity = new Entity(DATASTORE_KIND, rowId);
entity.setProperty(PROPERTY_KEY, list);
datastore.put(entity);
}
Setter and Getter Methods:
public synchronized int getPendingUsersForProcessing() {
String pendingUsersForProcessingAsString = null;
try {
pendingUsersForProcessingAsString = doGet(PENDING_USERS_FOR_PROCESSING);
return Integer.valueOf(pendingUsersForProcessingAsString);
} catch (NumberFormatException e) {
throw new IllegalStateException("The num of last batches processed in Datastore is not a number: "
+ pendingUsersForProcessingAsString);
} catch (EntityNotFoundException e) {
return DEFAULT_PENDING_USERS_FOR_PROCESSING;
}
}
/** {@inheritDoc } */
@Override
public synchronized void setPendingUsersForProcessing(int pendingUsersForProcessing) {
doPut(PENDING_USERS_FOR_PROCESSING, String.valueOf(pendingUsersForProcessing));
LOG.info("Number of Pending Users For Processing is set to : " + pendingUsersForProcessing);
}
Code Where I am trying to update the property:
int pendingUsers = appProperties.getPendingUsersForProcessing();
int requestUsers = request.getUserKeys().size();
appProperties.setPendingUsersForProcessing(pendingUsers + requestUsers);
Upvotes: 2
Views: 915
Reputation: 41089
This is not exactly a threading issue as you may have multiple instances of your app performing the tasks, and those instances do not know about each other. So this is a contention situation.
You have several options on how to resolve it.
Instead of constantly updating the same entity, create a new entity for each completed task, using the time when a task was completed as an id. The advantage of this approach is that it creates an audit trail and you can always get stats like the number of tasks completed today, within the last hour, etc. To count the number of entities you can use a keys-only query, which is almost free and very fast. The disadvantage is a higher cost of writing these entities - this is not a solution if you have a very large number of tasks to complete.
Instead of counting tasks, count the results of these tasks. For example, if a task updates a user status, you can count the number of users with "pending" status using a free and fast keys-only query. This is a very good approach if you already have an indexed property that you can use as a flag to count the tasks completed.
Upvotes: 1