2

当社は現在、データ ワークフロー (ELT、レポート生成、ML など) にprefect.ioを活用しています。Daskを利用した並列タスク実行機能の追加を開始したところです。私たちのフローは、一時的なAWS Fargateコンテナを使用して実行されます。このコンテナは、LocalCluster オブジェクトに渡される特定の数のワーカー、スレッド、プロセスとともに Dask LocalCluster を使用します。

Dask での旅は次のようになります。

  1. 許可されている最大 cpu/メモリが増えるまで、単一マシンの LocalCluster を使い続けます
  2. 1 つのコンテナーを拡張するときは、最初のコンテナー ( dask-kubernetes のように) に追加のワーカー コンテナーを生成し、それらを LocalCluster に結合します。

現在、256 cpu (.25 vCPU) と 512 メモリを備えたコンテナーから始めて、LocalCluster を 1 つの n_workers と 3 つの threads_per_worker に固定して、妥当な量の並列処理を実現しています。ただし、これは本当に当て推量です。Fargate で他の python ベースのアプリケーションを実行した以前の経験に基づいて、1 vcpu と 3 スレッド未満のマシンであるため、1 n_workers です。これは、アイテムのリストに対して関数をマップするだけの非常に単純な例ではうまくいくようです。

RENEWAL_TABLES = [
'Activity',
'CurrentPolicyTermStaus',
'PolicyRenewalStatus',
'PolicyTerm',
'PolicyTermStatus',
'EndorsementPolicyTerm',
'PolicyLifeState'
]

RENEWAL_TABLES_PAIRS = [
    (i, 1433 + idx) for idx, i in enumerate(RENEWAL_TABLES)
]


@task(state_handlers=[HANDLER])
def dummy_step():
    LOGGER.info('Dummy Step...')
    sleep(15)


@task(state_handlers=[HANDLER])
def test_map(table):
    LOGGER.info('table: {}...'.format(table))
    sleep(15)


with Flow(Path(__file__).stem, SCHEDULE, state_handlers=[HANDLER]) as flow:
    first_step = dummy_step()
    test_map.map(RENEWAL_TABLES_PAIRS).set_dependencies(upstream_tasks=[first_step])

一度に実行されるタスクは 3 つしかありません。

リモートワーカーを追加するために単一マシンのサイズを拡張するときに、n_workers(単一マシン)、スレッド、プロセスを最適に構成する方法を本当に理解したいと思います。ワークロードに依存することはわかっていますが、1 つのタスクがデータベースから csv への抽出を行い、別のタスクが pandas 計算を実行する単一のフローで物事の組み合わせを見ることができます。オンラインで、スレッド = ドキュメントに要求された CPU の数のように見えるものを見てきましたが、Fargate では 1 つ未満の CPU で並列処理を実現できるようです。

フィードバックをいただければ幸いです。Dask をより一時的な性質で活用しようとしている他のユーザーの助けになる可能性があります。

Fargate が vCPU に対して .25 -> .50 -> 1 -> 2 -> 4 と増加することを考えると、1 ワーカー対 1 vcpu セットアップを使用しても安全だと思います。ただし、Fargate の vcpu 割り当てがどのように機能するかを考えると、ワーカーあたりのスレッド数の適切な上限を選択する方法を理解しておくと役立ちます。

4

0 に答える 0