たとえば、7 つのスレッドで 169 のレコードを持つ Accuracer DB など、複数のスレッド間でレコードのワークロードを分割するにはどうすればよいですか。
範囲内のレコード数を分割し、各スレッドに範囲を処理させることができたからです。ただし、ユーザーがレコードを削除または追加すると、うまく機能しません。
たとえば、7 つのスレッドで 169 のレコードを持つ Accuracer DB など、複数のスレッド間でレコードのワークロードを分割するにはどうすればよいですか。
範囲内のレコード数を分割し、各スレッドに範囲を処理させることができたからです。ただし、ユーザーがレコードを削除または追加すると、うまく機能しません。
OmniThreadLibraryを使用すると、データベースからレコードを並行して処理することができます。
パイプラインの抽象化を使用して例を書きました。パイプラインは 3 つの段階で構成されます。
第 2 段階では、受信データを処理します。
DoSomethingWith
単純に約 100 ミリ秒を浪費するプロシージャを呼び出します。データの処理をシミュレートする1
を出力キューに追加して、別のレコードが処理されたことを最終ステージに通知します。このステージは、7 つのスレッドで並行して実行するように構成されています。
この例は、コピーして貼り付けるだけで、マシンで実際に動作していることを確認できるコンソール アプリケーションです。
program Project1;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SysUtils,
OtlCommon,
OtlCollections,
OtlParallel,
System.Diagnostics,
DB, DBClient;
type
//auxiliar container, used to copy the database data
//to avoid synchronization. remember TDataSet "current record"
//may cause conflicts if changed from different threads.
TContainer = class
private
FName: string;
FID: Int64;
public
property ID: Int64 read FID write FID;
property Name: string read FName write FName;
end;
//does nothing, but wastes around 100ms. "processing" each record
procedure DoSomethingWith(const AValue: TContainer);
begin
Sleep(100);
end;
//creates a DataSet on the fly with a random number of records
function CreateDataSet: TClientDataSet;
var
I: Integer;
begin
Result := TClientDataSet.Create(nil);
with Result.FieldDefs.AddFieldDef do
begin
Name := 'ID';
DataType := ftLargeint;
end;
with Result.FieldDefs.AddFieldDef do
begin
Name := 'NAME';
DataType := ftString;
end;
Result.CreateDataSet;
for I := 1 to Random(1000) do
Result.InsertRecord([I, 'Test']);
end;
var
RecordsProcessed: Integer;
SW: TStopwatch;
Data: TDataSet;
begin
IsMultiThread := True;
Randomize;
Writeln('wait while processing...');
SW := TStopwatch.Create;
SW.Start;
try
Data := CreateDataSet;
try
RecordsProcessed := Parallel.Pipeline
.Stage(
procedure (const Input, Output: IOmniBlockingCollection)
var
RecData: TContainer;
begin
Data.First;
while not Data.Eof do
begin
RecData := TContainer.Create;
RecData.ID := Data.Fields[0].AsLargeInt;
RecData.Name := Data.Fields[1].AsString;
Output.Add(RecData);
Data.Next;
end;
end)
.Stage(
procedure (const Input: TOmniValue; var Output: TOmniValue)
begin
//process the real thing here
DoSomethingWith(Input);
Input.AsObject.Free;
Output := 1; //another record
end)
.NumTasks(7) //this stage is processed by 7 parallel tasks
.Stage(
procedure (const Input, Output: IOmniBlockingCollection)
var
Recs: Integer;
Value: TOmniValue;
begin
Recs := 0;
for Value in Input do
Inc(Recs, Value);
Output.Add(Recs);
end)
.Run.Output.Next;
SW.Stop;
Writeln(RecordsProcessed, ' records processed in ', SW.ElapsedMilliseconds, 'ms.');
Writeln('Avg. ', (SW.ElapsedMilliseconds/RecordsProcessed):0:3, 'ms./record');
finally
Data.Free;
end;
except
on E: Exception do
Writeln(E.ClassName, ': ', E.Message);
end;
readln;
end.
このようにすることの主な利点は次のとおりです。