長時間実行される一括アクションを実行するために Cadence の使用を評価しています。次の(Kotlin)コードがあります:
class UpdateNameBulkWorkflowImpl : UpdateNameBulkWorkflow {
private val changeNamePromises = mutableListOf<Promise<ChangeNameResult>>()
override fun updateNames(newName: String, entityIds: Collection<String>) {
entityIds.forEach { entityId ->
val childWorkflow = Workflow.newChildWorkflowStub(
UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
)
val promise = Async.function(childWorkflow::setName, newName, entityId)
changeNamePromises.add(promise)
}
val allDone = Promise.allOf(changeNamePromises)
allDone.get()
}
class UpdateNameSingleWorkflowImpl : UpdateNameBulkWorkflow.UpdateNameSingleWorkflow {
override fun setName(newName: String, entityId: String): SetNameResult {
return Async.function(activities::setName, newName, entityId).get()
}
}
}
これはエンティティの数が少ない場合は問題なく機能しますが、すぐに次の例外に遭遇します。
java.lang.RuntimeException: Failure processing decision task. WorkflowID=b5327d20-6ea6-4aba-b863-2165cb21e038, RunID=c85e2278-e483-4c81-8def-f0cc0bd309fd
at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:283) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.worker.WorkflowWorker$TaskHandlerImpl.wrapFailure(WorkflowWorker.java:229) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.worker.PollTaskExecutor.lambda$process$0(PollTaskExecutor.java:76) ~[cadence-client-2.7.4.jar:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: com.uber.cadence.internal.sync.WorkflowRejectedExecutionError: java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@7f17a605[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@7fa9f240[Wrapped task = com.uber.cadence.internal.sync.WorkflowThreadImpl$RunnableWrapper@1a27000b]] rejected from java.util.concurrent.ThreadPoolExecutor@22188bd0[Running, pool size = 600, active threads = 600, queued tasks = 0, completed tasks = 2400]
at com.uber.cadence.internal.sync.WorkflowThreadImpl.start(WorkflowThreadImpl.java:281) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.sync.AsyncInternal.execute(AsyncInternal.java:300) ~[cadence-client-2.7.4.jar:na]
at com.uber.cadence.internal.sync.AsyncInternal.function(AsyncInternal.java:111) ~[cadence-client-2.7.4.jar:na]
...
スレッド プールが急速に使い果たされているようで、Cadence は新しいタスクをスケジュールできません。
の定義をupdateNames
次のように変更することで、これを回避しました。
override fun updateNames(newName: String, entityIds: Collection<String>) {
entityIds.chunked(200).forEach { sublist ->
val promises = sublist.map { entityId ->
val childWorkflow = Workflow.newChildWorkflowStub(
UpdateNameBulkWorkflow.UpdateNameSingleWorkflow::class.java
)
Async.function(childWorkflow::setName, newName, entityId)
}
val allDone = Promise.allOf(promises)
allDone.get()
}
}
これは基本的にアイテムを 200 個のチャンクで処理し、各チャンクが完了するのを待ってから次のチャンクに移動します。これがどれだけうまく機能するかについて懸念があります (チャンクでエラーが 1 回発生すると、再試行中に次のチャンクのすべてのレコードの処理が停止します)。また、クラッシュが発生した場合に、ケイデンスがこの機能の進行状況をどれだけうまく回復できるかにも関心があります。
私の質問は次のとおりです。この即時のリソースの枯渇を引き起こさない慣用的なケイデンスの方法はありますか? 間違ったテクノロジーを使用していますか、それとも単純なアプローチですか?