martinni39
martinni39

Reputation: 323

Sending external events to a workflow

In our cadence workflow we often need to wait a certain amount of time for external events before continuing (ie Email read, link clicked, etc..).

I was wondering what was the best way to notify our workflows of these events. Are signals the right way, or should we create an activity that would wait for the event?

From what I've seen we need to create a signal channel ch := workflow.GetSignalChannel(ctx, SignalName), however the context is not available in activities.

Upvotes: 3

Views: 1075

Answers (1)

Maxim Fateev
Maxim Fateev

Reputation: 6890

Signaling is the recommended way for sending events to workflows.

The frequently used pattern for Go workflows is to use Selector to wait for multiple signal channels as well as a timer future.

Go sample:

sig1Ch := workflow.GetSignalChannel(ctx, "signal1")
sig2Ch := workflow.GetSignalChannel(ctx, "signal2")
timeout := workflow.NewTimer(ctx, time.Minute * 30)

s := workflow.NewSelector(ctx)

var signal1 *Signal1Struct
var signal2 *Signal2Struct
s.AddFuture(timeout, func(f Future) {
})
s.AddReceive(sig1Ch, func(c Channel, more bool) {
    c.Receive(ctx, signal1)
})
s.AddReceive(sig2Ch, func(c Channel, more bool) {
    c.Receive(ctx, signal2)
})

s.Select(ctx)

if signal1 == nil && signal2 == nil {
   // handle timeout
} else {
  // process signals
}

Java sample:

public interface MyWorkflow {

    @WorkflowMethod
    void main();

    @SignalMethod
    void signal1(Signal1Struct signal);

    @SignalMethod
    void signal2(Signal2Struct signal);

}

public class MyWorkflowImpl implements MyWorkflow {

    private Signal1Struct signal1;
    private Signal2Struct signal2;

    @Override
    public void main() {
        Workflow.await(Duration.ofMinutes(30), 
            () -> signal1 != null || signal2 != null);

        if (signal1 == null && signal2 == null) {
            // handle timeout
        }
        // process signals
    }

    @Override
    public void signal1(Signal1Struct signal) {
        signal1 = signal;
    }

    @Override
    public void signal2(Signal2Struct signal) {
        signal2 = signal;
    }
}

Note that it is a good idea to account for the workflow worker outages. For example let's imagine that the above workflow is started and a signal is received 40 minutes after the start while all workflow workers are down. In this case when workers are brought back both timeout future and signCh would be not empty. As Selector doesn't guarantee ordering it is possible that the signal is delivered before the timer even if it was received after. So your code logic should account for this. For example there is a hard requirement that a signal received after the 30 minutes since the workflow start must be ignored. Then the above sample has to be modified to:

Go sample:

...
start := workflow.Now(ctx); // must use workflow clock
s.Select(ctx)
duration := workflow.Now(ctx).Sub(start)
if duration.Minutes() >= 30 || (signal1 == nil && signal2 == nil) {
   // handle timeout
} else {
  // process signals
}

Java sample:

public void main() {
    long start = Workflow.currentTimeMillis(); // must use workflow clock
    Duration timeout = Duration.ofMinutes(30);
    Workflow.await(timeout, () -> signal1 != null || signal2 != null);
    long duration = Workflow.currentTimeMillis() - start;
    if (timeout.toMillis() <= duration || (signal1 == null && signal2 == null)) {
        // handle timeout
    }
    // process signals
}

The updated code behaves correctly even if the workflow execution was delayed for an hour.

Edit: Added signal sending sample

Go sample:

c, err := client.NewClient(client.Options{
    HostPort: client.DefaultHostPort,
})
if err != nil {
    log.Fatalln("Unable to create client", err)
}
defer c.Close()

err := c.SignalWorkflow(context.Background(), <workflowId>, "", "signal1", Signal1Struct{})

Java sample:

WorkflowServiceStubs service = WorkflowServiceStubs.newInstance();
WorkflowClient client = WorkflowClient.newInstance(service);
GreetingWorkflow myWorkflow = 
client.newWorkflowStub(MyWorkflow.class, <workflowId>);
myWorkflow.signal1(new Signal1Struct());

Upvotes: 5

Related Questions