Reputation: 841
I have a decider that calls an activity. The activity returns a list of validation errors. If the decider receives that list and it's not empty, I want to exit out of the workflow.
To exit the workflow I can just return in the decider, and it exits the workflow. The problem is that I'm not able to get the result from the activity. I'd have expected this to work, but it doesn't.. violations.isReady() always returns false:
public class WorkflowImpl implements Workflow {
ActivitiesClient activities;
public Workflow(/*...*/) {
// ...
}
@Override
public void do(WorkflowInput input) {
Promise<List<String>> violations = activities.validate(input);
if (!violations.isReady()) { // "do()" will be called when violations is ready.. right?
return;
} else if (!CollectionUtils.isEmpty(violations.get())) {
return; // or throw ValidationException
}
// do other stuff
}
}
I don't want to do the alternative which is to poll for violations.isReady() to be true, because this will tie up the decider thread for who knows how long. I'm not even sure if that would work.
Help?
Upvotes: 3
Views: 2502
Reputation: 6870
The workflow code is asynchronous. So a Promise returned from an activity is never ready in the same callback. So in your code violations.isReady() always returning false is correct behavior. You have to use a method annotated as @Asynchronous or Task to associate callback to a Promise. So your code should look like:
public class WorkflowImpl implements Workflow {
ActivitiesClient activities;
public Workflow(/*...*/) {
// ...
}
@Override
public void do(WorkflowInput input) {
Promise<List<String>> violations = activities.validate(input);
processValidationResult(violations);
}
@Asynchronous
private void processValidationResult(Promise<List<String>> violations) {
// As method is @Asynchronous framework guarantees violations is ready
// when its body is executed.
if (!CollectionUtils.isEmpty(violations.get())) {
return; // or throw ValidationException
}
// do other stuff
}
}
Note that AspectJ should be configured correctly with Flow Framework Aspects for @Asynchronous annotation to take effect.
Another option is to use Task directly:
public class WorkflowImpl implements Workflow {
ActivitiesClient activities;
public Workflow(/*...*/) {
// ...
}
@Override
public void do(WorkflowInput input) {
Promise<List<String>> violations = activities.validate(input);
new Task(violations) {
@Override
public void doExecute() {
// Framework guarantees violations is ready
// (as it is Task constructor parameter)
// when execute method is executed.
if (!CollectionUtils.isEmpty(violations.get())) {
return; // or throw ValidationException
}
// do other stuff
}
};
}
}
I would recommend reading through SWF Recipes and samples to get better idea about patterns used when writing SWF workflows.
Also make sure that you go through TryCatchFinally javadoc as AWS Flow Framework error handling while very powerful is quite different from what most people are used to.
Added to show example of returning value from processValidationResult(...):
@Override
public void do(WorkflowInput input) {
Promise<List<String>> violations = activities.validate(input);
Promise<String> whatever = processValidationResult(violations);
processNextStep(whatever);
}
@Asynchronous
private Promise<String> processValidationResult(Promise<List<String>> violations) {
// As method is @Asynchronous framework guarantees violations is ready
// when its body is executed.
if (!CollectionUtils.isEmpty(violations.get())) {
throw new ValidationException(...);
}
// do other stuff
return Promise.asPromise("result string");
}
@Asynchronous
private void processNextStep(Promise<String> whatever) {
...
}
Upvotes: 2