1

たとえば、7 つのスレッドで 169 のレコードを持つ Accuracer DB など、複数のスレッド間でレコードのワークロードを分割するにはどうすればよいですか。

範囲内のレコード数を分割し、各スレッドに範囲を処理させることができたからです。ただし、ユーザーがレコードを削除または追加すると、うまく機能しません。

4

1 に答える 1

3

OmniThreadLibraryを使用すると、データベースからレコードを並行して処理することができます。

パイプラインの抽象化を使用して例を書きました。パイプラインは 3 つの段階で構成されます。

  1. 最初のステージでは、データベースからデータを読み取り、コンテナー オブジェクトのインスタンスを作成して、パイプラインの次のステージのデータを表します。
  2. 第 2 段階では、受信データを処理します。

    • DoSomethingWith単純に約 100 ミリ秒を浪費するプロシージャを呼び出します。データの処理をシミュレートする
    • コンテナ インスタンスのメモリを解放します。
    • 次に、リテラル値1を出力キューに追加して、別のレコードが処理されたことを最終ステージに通知します。

    このステージは、7 つのスレッドで並行して実行するように構成されています。

  3. 最後の段階は、前の段階から完了したレコードの数をカウントするだけです

この例は、コピーして貼り付けるだけで、マシンで実際に動作していることを確認できるコンソール アプリケーションです。

program Project1;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  OtlCommon,
  OtlCollections,
  OtlParallel,
  System.Diagnostics,
  DB, DBClient;

type
  //auxiliar container, used to copy the database data
  //to avoid synchronization. remember TDataSet "current record"
  //may cause conflicts if changed from different threads.
  TContainer = class
  private
    FName: string;
    FID: Int64;
  public
    property ID: Int64 read FID write FID;
    property Name: string read FName write FName;
  end;

//does nothing, but wastes around 100ms. "processing" each record
procedure DoSomethingWith(const AValue: TContainer);
begin
  Sleep(100);
end;

//creates a DataSet on the fly with a random number of records
function CreateDataSet: TClientDataSet;
var
  I: Integer;
begin
  Result := TClientDataSet.Create(nil);
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'ID';
    DataType := ftLargeint;
  end;
  with Result.FieldDefs.AddFieldDef do
  begin
    Name := 'NAME';
    DataType := ftString;
  end;
  Result.CreateDataSet;
  for I := 1 to Random(1000) do
    Result.InsertRecord([I, 'Test']);
end;

var
  RecordsProcessed: Integer;
  SW: TStopwatch;
  Data: TDataSet;
begin
  IsMultiThread := True;
  Randomize;
  Writeln('wait while processing...');
  SW := TStopwatch.Create;
  SW.Start;
  try
    Data := CreateDataSet;
    try
      RecordsProcessed := Parallel.Pipeline
        .Stage(
          procedure (const Input, Output: IOmniBlockingCollection)
          var
            RecData: TContainer;
          begin
            Data.First;
            while not Data.Eof do
            begin
              RecData := TContainer.Create;
              RecData.ID := Data.Fields[0].AsLargeInt;
              RecData.Name := Data.Fields[1].AsString;
              Output.Add(RecData);
              Data.Next;
            end;
          end)
        .Stage(
          procedure (const Input: TOmniValue; var Output: TOmniValue)
          begin
            //process the real thing here
            DoSomethingWith(Input);
            Input.AsObject.Free;
            Output := 1; //another record
          end)
        .NumTasks(7) //this stage is processed by 7 parallel tasks
        .Stage(
           procedure (const Input, Output: IOmniBlockingCollection)
           var
             Recs: Integer;
             Value: TOmniValue;
           begin
             Recs := 0;
             for Value in Input do
               Inc(Recs, Value);
             Output.Add(Recs);
           end)
        .Run.Output.Next;
      SW.Stop;
      Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
      Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
    finally
      Data.Free;
    end;
  except
    on E: Exception do
      Writeln(E.ClassName, ': ', E.Message);
  end;
  readln;
end.

このようにすることの主な利点は次のとおりです。

  • 複数のワーカー間でジョブを分散するための柔軟なメカニズムがあります。一部のレコードの処理に時間がかかる場合、ライブラリが状況を処理するため、可能な限り短い時間で全体の作業を完了することが合理的に期待できます。
  • データベースから最初のレコードの読み取りが完了するとすぐに、最初の処理スレッドが開始されます。
  • ベーステーブルでさらに受信レコードを待つ必要がある場合は、簡単に適応させることができます。ステージの出力キューは、ステージ プロシージャ内のコードが終了するまで終了としてマークされません。ある時点で行う作業がなくなった場合、後続のすべてのステージは、さらにデータが処理されるのを待つだけでブロックされます。
  • パラメータ値を変更するだけで、ワーカー スレッドの数を変更できます。
于 2013-03-10T07:53:36.657 に答える