Reputation: 129
I have a question in cadence workflow as to , can we invoke the same activity with different inputs inside a for loop ? Will that code be deterministic? Will cadence be able to replay the events when it re-constructs the workflow, if the worker executing the workflow is stopped during the execution and restarted later.
For example, I have the following code.
func init() {
workflow.RegisterWithOptions(SampleWorkFlow, workflow.RegisterOptions{Name: "SampleWorkFlow"})
activity.RegisterWithOptions(SampleActivity, activity.RegisterOptions{Name: "SampleActivity"})
activity.RegisterWithOptions(SecondActivity, activity.RegisterOptions{Name: "SecondActivity"})
}
// SampleWorkFlow comment
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
for i := 1; i <= 10; i++ {
value := i
workflow.Go(ctx, func(ctx workflow.Context) {
err := workflow.ExecuteActivity(ctx, "SecondActivity", input, value).Get(ctx, &result)
if err != nil {
log.Println("err=", err)
}
})
}
return nil
}
// SampleActivity comment
func SampleActivity(ctx context.Context, value, v1 string) (string, error) {
fmt.Println("Sample activity start")
for i := 0; i <= 10; i++ {
fmt.Println(i)
}
return "Hello " + value, nil
}
// SecondActivity comment
func SecondActivity(ctx context.Context, value int) (string, error) {
fmt.Println("Second activity start")
fmt.Println("value=", value)
fmt.Println("Second activity going to end")
return "Hello " + fmt.Sprintf("%d", value), nil
}
Here, the Second activity is invoked parallely inside a for loop. My first question is , is this code deterministic ?
Let's say after 5 iterations of the loop, when i =5, the worker executing this workflow terminates, will cadence be able to replay the events if the workflow is started in another worker ?
Can you please answer my question ?
Upvotes: 3
Views: 1222
Reputation: 6870
Yes, this code is deterministic. It doesn't call any non-deterministic operations (like random or UUID generation) and uses workflow.Go
to start a goroutine. So it is deterministic. The complexity of the code doesn't play a role in defining its determinism.
Unrelated nit. There is no need to use a goroutine in your sample as ExecuteActivity
call is already non-blocking by returning a Future.
So the sample can be simplified to:
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
for i := 1; i <= 10; i++ {
workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
}
return nil
}
Note that this sample is still might execute not the way you expect as it completes workflow without waiting for the activities completion. So these activities are not even going to start.
Here is the code that is going to wait for the activities to complete:
func SampleWorkFlow(ctx workflow.Context, input string) error {
fmt.Println("Workflow started")
ctx = workflow.WithTaskList(ctx, sampleTaskList)
ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)
var result string
err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)
if err != nil {
return err
}
var results []workflow.Future
for i := 1; i <= 10; i++ {
future := workflow.ExecuteActivity(ctx, "SecondActivity", input, i)
results = append(results, future)
}
for i := 0; i < 10; i++ {
var result string
err := results[i].Get(ctx, &result)
if err != nil {
log.Println("err=", err)
}
}
return nil
}
Upvotes: 2