user1189332
user1189332

Reputation: 1941

Temporal io workflow engine does it block a thread while calling an activity?

Newbie to Temporal. This is my workflow method code using the java sdk.

Map<String, String> activityResult = customActivity.fetchCustomMap();
Workflow.sleep(Duration.ofSeconds(5));
Map<String, String> activityResult1 = customActivity.fetchCustomMap();
System.out.println(Thread.currentThread() + "activity result " + activityResult);
System.out.println(Thread.currentThread() + "activity result1 " + activityResult1);
Workflow.sleep(Duration.ofSeconds(2));
System.out.println(Thread.currentThread() + "workflow execution ends");
Workflow.sleep(Duration.ofSeconds(2));

Just wanted to understand when an activity method is called, it appears as if the thread is blocked (works like a sync request/response).

How does it scale? IF this is all synchronous, this means a workflow is essentially pinned to one worker instance?

How does the activity look if it has to be async and needed to be distributed to different workers?

If that happens, how does the caching work in case of replays?

A different worker needs to know what all the previous activities have been executed?

[Edit - Added more info]

In the above code snippet, when a thread T1 is executing this line:

Map<String, String> activityResult = customActivity.fetchCustomMap();

My understanding (please correct if something is incorrect) is - it is intercepted by the SDK and makes a remote call (via events) to the Temporal Server to schedule the activity execution.

The temporal server then emits an activity event for worker(s) to consume and execute.

It is then picked up by a worker (it could be another worker too if multiple worker instances are running) to execute the activity.

Once the activity execution is finished, then the SDK intercepts and makes a remote call to mark the completion and record the state in the history.

Then, another workflow event is scheduled and emitted and consumed by the first worker which initiated the activity call (from the workflow) and it resumes from there on.

The question I was having is, is the thread T1 (where the activity call was initially forked) is blocked until this round-trip finishes?

Upvotes: 1

Views: 3042

Answers (2)

James
James

Reputation: 4731

My understanding (please correct if something is incorrect) is - it is intercepted by the SDK and makes a remote call (via events) to the Temporal Server to schedule the activity execution.

The temporal server then emits an activity event for worker(s) to consume and execute.

It is then picked up by a worker (it could be another worker too if multiple worker instances are running) to execute the activity.

Once the activity execution is finished, then the SDK intercepts and makes a remote call to mark the completion and record the state in the history.

Then, another workflow event is scheduled and emitted and consumed by the first worker which initiated the activity call (from the workflow) and it resumes from there on.

What you describe here is pretty much correct.

Things like Workflow.sleep() and customActivity.fetchCustomMap() generates Workflow commands, which the Worker sends back to the Temporal server on completion of the Workflow Task. When the timer fires, or the activity completes, fails or timeouts, an appropriate workflow event is appended to the Workflow History, and a new Workflow Task gets scheduled.

The question I was having is, is the thread T1 (where the activity call was initially forked) is blocked until this round-trip finishes?

The question you are asking is essentially: what does happen with the Java thread that was executing that Workflow code once the Workflow Task has completed? Does it consume resources on the Worker until the next Workflow Task comes in (e.g. with the return value of the activity or a notification that the timer has fired)?

Surprisingly, would there be no workflow caching, the answer to this question would simply be no. The thread would simply get "killed" after sending out the Workflow Task completion to the server. Then, when a new Workflow Task would come in, a new thread would be started to execute the Workflow code again, replaying past history up to the previous Workflow Task, then finally executing the new Workflow Task. And once that new Workflow Task would complete, the thread would again get "killed", freeing away resources associated with that Workflow execution.

Replaying a Workflow history is possible thanks to two reasons: 1) your workflow code itself is only allowed to contain deterministic logic, and 2) anything non-deterministic (or external to the workflow logic) that happened previously has been permenantly written to the Workflow History. Therefore, any worker that possess that workflow code and the Workflow Execution History will follow exactly the same path through the code, and therefore, will generate the same Workflow Commands (i.e. call the same activities, with the same arguments; set the same timers; etc.)

Now, while replaying the Workflow History, the SDK will match workflow commands emited by the Workflow code being executed with events recorded in the Workflow History, and use these events to "immediately" resolve these commands (i.e. without making a new round trip to the server, nor actually reexecuting the command). For example, in your sample workflow, the first call to customActivity.fetchCustomMap() will immediately return the return value of that activity, as recorded in the Workflow History.

I'm obviously simplifying the process and intentionally skipping over many low level details, but the key idea is this: a Workflow runtime state can be fully restored, on any worker, by replaying the same Workflow History through the same Workflow code.

Of course, there are performance costs associated to replaying a workflow execution, e.g. the time it takes for the worker to fetch the complete workflow history from the Temporal server. That's why workers maintain a cache of workflow executions; when a new workflow task come in for a cached workflow execution, the worker will simply "resume" that execution, without having to replay.

Now, back to your actual question…

How does it scale? IF this is all synchronous, this means a workflow is essentially pinned to one worker instance?

Yes, given the Java threading model, each cached workflow in the Java SDK indeed corresponds to some blocked threads (possibly more than one, though you don't need to worry about thread safety inside your workflow code).

However, as Maxim mentioned:

Workflow is cached on a worker instance as a performance optimization. So if it is blocked for a long time it is usually pushed out of the cache and is recovered on another worker when the time to continue comes. This way you can have many more blocked workflows than a single worker can keep in memory or has threads to process.

That is, a cached Workflow may get evicted from cache by the worker, for instance if the cache is full, if an error happens while reporting the workflow task completion back to the server (in which case the cached workflow's state would be out of sync, and must therefore be discarded), or if the worker shuts down.

When the worker receives the signal that the activity is finished (via an event), the blocked thread resumes and the execution moves on to the next line in the workflow function. [...] how is the thread paused and resumed (upon an external signal)? What's the java concurrency construct used (just out of interest)?

Internally, workflow commands are really handled asynchronously, so let's start with that case. Let's suppose you schedule an activity using the following syntax: Async.function(customActivity::fetchCustomMap). What happens under the hood is 1) a Schedule Activity command is added to the current Workflow Task with a unique number (it will actually get sent to the server at the end of the current workflow task, which will happen once the worker detects that it is no longer possible to make any progress, i.e. that all promises are blocked). Then, 2) a Promise object is created (that is similar to Java's Future; more on this later) and added to a map of pending async completions (the key here being the command's unique number). That's the Promise object returned by Async.function(). Later, when the worker receives the result of that activity as part of a subsequent workflow task, it looks into the map to get the corresponding promise, and marks it as completed (with the proper value), which unlocks anything that was waiting on it.

When starting an activity using the synchronous syntax (e.g. result = customActivity.fetchCustomMap()), the only difference is that the activity stub calls get() on the Promise, which makes the current thread yield back control until the promise is resolved (similar to Java's Future.get()). IIRC, when a workflow gets evicted from cache, the worker resolves all pending promises of that workflow with a DestroyWorkflowThreadError, and any further attempt to call some Workflow APIs from that Workflow also results in that error being thrown. This is how workflow threads get destroyed.

Note that Temporal's Java SDK provides it own implementations of most concurrency primitives (e.g. Promise and CompletablePromise rather than Future and CompletableFuture; Workflow.sleep instead of Thread.sleep; WorkflowQueue instead of BlockingQueue, etc. There are a few reasons for that, but notably that the SDK needs to guarantee the execution order of async completions (i.e. normal thread execution order is non-deterministic; same for tasks in an executor), and because these alternate primitives are aware of Temporal-specific constraints. For example, WorkflowQueue don't need to acquire a lock, as Temporal already guarantee that only one thread may execute on a given workflow at any time. Also, it implements its blocking logic using a Workflow condition rather than using a Lock condition, which would block the thread in a way that wouldn't yield control of execution back to the worker's deterministic task schedule.

If you are interested in knowing more about some of the particular challenges faced in implementing Temporal's deterministic execution runtime in various language, please have a look at the following YouTube video: Ensuring Deterministic Execution in Modern Language Runtimes.

Upvotes: 5

Maxim Fateev
Maxim Fateev

Reputation: 6890

How does it scale? IF this is all synchronous, this means a workflow is essentially pinned to one worker instance?

Workflow is not pinned to the worker instance. Otherwise, a workflow would not be able to continue if this worker instance dies. Workflow is cached on a worker instance as a performance optimization. So if it is blocked for a long time it is usually pushed out of the cache and is recovered on another worker when the time to continue comes. This way you can have many more blocked workflows than a single worker can keep in memory or has threads to process.

How does the activity look if it has to be async and needs to be distributed to different workers?

Activity invocation is internally always async and by default is delivered to workers through a task queue. So it can end up on another worker any time it is executed or retried.

A different worker needs to know what all the previous activities have been executed?

I don't understand the question. The activity worker is fully stateless. Each activity execution is completely independent.

Upvotes: 3

Related Questions