質問で言ったように、パフォーマンスよりもテスト容易性に焦点を当てる必要があります。
生産者/消費者モデルを提案します。データベース (新しい行) に書き込みたいスレッドをいくつでも持つことができ、データベース サーバーに同時実行性を処理させることができます。これはシステムの最初の部分であり、多くのスレッドが行をテーブルにポンピングします。
各行を 1 回だけ処理するには、新しい行をロードして Queue に送り込む単一のスレッドを使用することをお勧めします。次に、キューを処理したい数のスレッドを持つことができます。処理が完了すると、データベースの行を更新したり、別のスレッドがバッチで更新要求を収集して処理する出力キューに書き込んだりできます。
テーブルに PROCESSING_STATUS 列があり、新しい行には常に PROCESSING_STATUS = 0 があるとします。したがって、スレッドはこのテーブルに新しい行を自由に追加できます。別のスレッドは、PROCESSING_STATUS = 0 のすべての行を選択して、このテーブルを (事前定義された間隔/イベントで、または単純にポーリングして) 継続的にクエリします。次に、各行がキューに追加されます。ロードしたら、PROCESSING_STATUS を 1 に更新できます。再度クエリを実行する前にこれを完了する必要があります。これは、同じ行を 2 回ロードしないようにするために重要です。
実際のワーカー スレッドはこのキューを消費します。同時キューまたは同様の構造を使用して、多くのコンシューマーを処理できると仮定します。キュー アルゴリズムは、1 つのスレッドだけが同じ要素を取得できることを保証する必要があります。この種の Queue は、Python、C#、または Java の標準ライブラリで簡単に見つけることができます。次に、実際のトレッドがこの行を処理し、出力キューに書き戻します。
行の書き戻しを担当するスレッドは、作業スレッドが生成したデータと PROCESSING_STATUS 列を更新し、たとえば 2 に設定します。この更新は、読み取り後に変更されていないことを確認するために、行のすべての既知のキーと値を使用して実行する必要があります。書き込みスレッドは、更新クエリで影響を受ける行の値も確認し、処理後に行が削除または変更されていないかどうかを確認する必要があります。
テスト容易性に関しては、PROCESSING_STATUS 列を調べて、未処理の行があるかどうかを確認できます。PROCESSING_STATUS=0 の場合 - この行はロードされていません。1 の場合、ロードされましたが、処理/書き戻しはされていません。2 は、処理されたことを意味します。各行の処理が正しく行われたかどうかを確認する必要がありますが、これは標準的なテストです。
複数のスレッドが同じ行にアクセスしようとしたかどうか、または更新ステートメントで影響を受ける行をチェックして、最初に読み取られた後に行が変更されたかどうかを確認できます。更新によって影響を受ける行がない場合は、すでに処理または変更されていることを意味します。
したがって、このシナリオでのテスト容易性の鍵は、スレッドの同期にキューを使用し、データベースへの更新をチェックすることです。キューと処理スレッドのカウンターを使用して、ロードされた行数 = 処理された行数 = 書き込まれた行数であるかどうかを確認することもできます。
データベースからデータをロードする多くのスレッドが必要な場合は、PROCESSING_STATUS 列の使用も拡張できます。未処理の (新しい) 行が PROCESSING_STATUS = 0 で追加されると想像してください。次に、読み取りスレッドのセット (それぞれが正で 0 とは異なる一意の番号を持つ) は、限定された select ステートメントと更新を結合します。何かのようなもの:
update TABLE_X set PROCESSING_STATUS = MY_UNIQUE_THREAD_ID
where key in (select key from TABLE_X where PROCESSING_STATUS = 0 LIMIT 5)
and PROCESSING_STATUS = 0
影響を受ける行がゼロでない場合、このスレッドにはロードする行がいくつかあります。次のステップは、PROCESSING_STATUS = MY_UNIQUE_THREAD_ID であるすべての行をロードすることです。その後、同じアルゴリズムを再度使用できます。行が処理されると、その PROCESSING_STATUS が MY_UNIQUE_THREAD_ID の負の値で更新されます。このようにデータベースを使用して同時実行を処理しますが、最高のパフォーマンスが得られるわけではありません。少なくとも、すべての行を 1 回だけ処理するという元の問題は解決されます。
データベース サーバーに負荷をかけずに行を 1 回だけロードする別の方法は、キーに対してモジュロ演算を使用することです (キーがシリアル キーの場合)。select ステートメントのキー (k % n_readers) にモジュロを使用します。ロードするには:
SELECT * from TABLE_X WHERE (key % N) == MY_UNIQUE_THREAD_ID