Reputation: 117
I am evaluating the use of Cadence for performing long-running bulk actions. I have the following (Kotlin) code:
class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {
private val changeNamePromises = mutableListOf<Promise<ChangeNameResult>>()
override fun updateNames(newName: String, entityIds: Collection<String>) {
entityIds.forEach { entityId ->
val childWorkflow = Workflow.newChildWorkflowStub(
UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
)
val promise = Async.function(childWorkflow::setName, newName, entityId)
changeNamePromises.add(promise)
}
val allDone = Promise.allOf(changeNamePromises)
allDone.get()
}
class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
override fun setName(newName: String, entityId: String): SetNameResult {
return Async.function(activities::setName, newName, entityId).get()
}
}
}
This works fine for smaller numbers of entities, but I quickly run into the following exception:
java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038, RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:229) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:76) ~[cadence-client-2.7.4.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f17a605[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7fa9f240[Wrapped task = com.uber.cadence.internal.sync.WorkflowThreadImpl$RunnableWrapper@1a27000b]] rejected from java.util.concurrent.ThreadPoolExecutor@22188bd0[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 2400]
at com.uber.cadence.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:281) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.sync.AsyncInternal.execute(AsyncInternal.java:300) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.sync.AsyncInternal.function(AsyncInternal.java:111) ~[cadence-client-2.7.4.jar:na]
...
It appears that I am quickly exhausting the thread pool and Cadence is unable to schedule new tasks.
I have worked around this by changing the definition of updateNames
to:
override fun updateNames(newName: String, entityIds: Collection<String>) {
entityIds.chunked(200).forEach { sublist ->
val promises = sublist.map { entityId ->
val childWorkflow = Workflow.newChildWorkflowStub(
UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
)
Async.function(childWorkflow::setName, newName, entityId)
}
val allDone = Promise.allOf(promises)
allDone.get()
}
}
This basically processes the items in chunks of 200, and waits for each chunk to complete before moving onto the next one. I have concerns with how well this will perform (a single error in a chunk will stop processing of all records in the following chunks while it is retried). I'm also concerned with how well Cadence will be able to recover the progress of this function in the event of a crash.
My question is: Is there an idiomatic Cadence way of doing this that doesn't cause this immediate resource exhaustion? Am I using the wrong technology or is this just a naive approach?
Upvotes: 0
Views: 799
Reputation: 6870
Cadence workflow has relatively small limit on the size of a single workflow run. It scales out with the number of parallel workflow runs. So executing very large number of tasks in a single workflow is an anti-pattern.
The idiomatic solutions are:
Upvotes: 1