rabble
rabble

Reputation: 117

Cadence throwing WorkflowRejectedExecutionError when executing workflow with many child workflows/activities

I am evaluating the use of Cadence for performing long-running bulk actions. I have the following (Kotlin) code:

class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {

    private val changeNamePromises = mutableListOf<Promise<ChangeNameResult>>()

    override fun updateNames(newName: String, entityIds: Collection<String>) {
        entityIds.forEach { entityId ->
            val childWorkflow = Workflow.newChildWorkflowStub(
                    UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
            )
            val promise = Async.function(childWorkflow::setName, newName, entityId)

            changeNamePromises.add(promise)
        }

        val allDone = Promise.allOf(changeNamePromises)
        allDone.get()
    }

    class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
        override fun setName(newName: String, entityId: String): SetNameResult {
            return Async.function(activities::setName, newName, entityId).get()
        }
    }
}

This works fine for smaller numbers of entities, but I quickly run into the following exception:

java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038, RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:229) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:76) ~[cadence-client-2.7.4.jar:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f17a605[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7fa9f240[Wrapped task = com.uber.cadence.internal.sync.WorkflowThreadImpl$RunnableWrapper@1a27000b]] rejected from java.util.concurrent.ThreadPoolExecutor@22188bd0[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 2400]
    at com.uber.cadence.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:281) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.sync.AsyncInternal.execute(AsyncInternal.java:300) ~[cadence-client-2.7.4.jar:na]
    at com.uber.cadence.internal.sync.AsyncInternal.function(AsyncInternal.java:111) ~[cadence-client-2.7.4.jar:na]
...

It appears that I am quickly exhausting the thread pool and Cadence is unable to schedule new tasks.

I have worked around this by changing the definition of updateNames to:

    override fun updateNames(newName: String, entityIds: Collection<String>) {

        entityIds.chunked(200).forEach { sublist ->
            val promises = sublist.map { entityId ->
                val childWorkflow = Workflow.newChildWorkflowStub(
                        UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
                )
                Async.function(childWorkflow::setName, newName, entityId)
            }

            val allDone = Promise.allOf(promises)
            allDone.get()
        }
    }

This basically processes the items in chunks of 200, and waits for each chunk to complete before moving onto the next one. I have concerns with how well this will perform (a single error in a chunk will stop processing of all records in the following chunks while it is retried). I'm also concerned with how well Cadence will be able to recover the progress of this function in the event of a crash.

My question is: Is there an idiomatic Cadence way of doing this that doesn't cause this immediate resource exhaustion? Am I using the wrong technology or is this just a naive approach?

Upvotes: 0

Views: 799

Answers (1)

Maxim Fateev
Maxim Fateev

Reputation: 6870

Cadence workflow has relatively small limit on the size of a single workflow run. It scales out with the number of parallel workflow runs. So executing very large number of tasks in a single workflow is an anti-pattern.

The idiomatic solutions are:

  • Run a chank of limited size and then call continue as new. This way a single run size is bounded.
  • Use hierarchical workflows. A single parent with 1k children each executing 1k activities allows executing 1 million activities keeping each workflow history size bounded.

Upvotes: 1

Related Questions