知らせ:
元の投稿タイトル
DWScript のマルチスレッド JSON パーサーがスレッド数に応じて拡張されないのはなぜですか?
この問題はDWScript を使用した JSON データの処理に関連していないため、変更されました。問題は、Delphi XE2 から XE7 (テストは XE2 と試用版 XE7 ) のデフォルトのメモリ マネージャーにありますが、このようなアプリケーションで最初に問題が発生しました。
Delphi XE2 で JSON データを処理するマルチスレッドの Win32/Win64 vcl アプリケーションがあります。
各スレッドは、DWScript を使用して JSON データを解析しTdwsJSONValue.ParseString(sJSON)
、DWScript メソッドを使用して値を読み取り、結果をレコードとして保存します。
テスト目的で、各スレッドで同じ JSON データを処理します。
単一スレッドの実行ではN
、スレッド内でデータを処理するのに数秒かかります。スレッド数を線形にM
増やすと (約M * N
)、単一スレッド内で同じデータを処理するのに必要な時間が増加します。
その結果、速度の向上はありません。このアプリケーションの他の部分 (JSON データ配信、ターゲット環境への結果の保存) - 期待どおりにスケーリングします。
理由は何ですか?どんなアイデアでも大歓迎です。
補足情報:
Win7/32 および Win7/64、Win8/64 の 2 コアから 12 コア (HT なし) システムでテスト済み
DWScript が利用可能な最速のものとして選択されました (その中には Superobject、組み込みの Delphi などをテストしました)。SO は、DWS の JSON ユニットと同様に動作します。
以下は、問題を示す完全なコンソール アプリです。これを実行するには、 https ://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 で入手できるサンプル json データが必要です。このファイルには
json1.dat
、最初のスレッドのデータが含まれています。スレッド数が 16 までの場合は、json1.dat を json2.dat...json16.dat にコピーするだけです。プログラムとデータは同じフォルダにある必要があります。実行するには: convert.exe N (N はスレッド数)。
プログラムは実行時間 (ミリ秒単位) を stout に書き込みます - スレッドで費やされた時間、データの解析時間、TdwsJSONValue オブジェクトの解放 (破棄) 時間。ステートメント
_dwsjvData.Destroy;
はスケーリングしません。
program Convert;
{$APPTYPE CONSOLE}
{$R *.res}
uses
System.SysUtils,
System.Diagnostics,
System.Classes,
dwsJSON in 'dwsJSON.pas',
dwsStrings in 'dwsStrings.pas',
dwsUtils in 'dwsUtils.pas',
dwsXPlatform in 'dwsXPlatform.pas';
type
TWorkerThread = class (TThread)
private
_iUid: Integer;
_swWatch: TStopwatch;
_lRunning: Boolean;
_sFileJSonData: String;
_fJsonData: TextFile;
protected
constructor Create (AUid: Integer);
procedure Execute; override;
published
property Running: Boolean read _lRunning;
end;
TConverter = class (TObject)
private
_swWatch0, _swWatch1, _swWatch2: TStopwatch;
_dwsjvData: TdwsJSONValue;
protected
constructor Create;
destructor Destroy; override;
function Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
end;
const
MAX_THREADS = 16;
var
iHowMany: Integer;
athWorker: array [1..MAX_THREADS] of Pointer;
aiElapsed: array [1..MAX_THREADS] of Integer;
aiElapsedParse: array [1..MAX_THREADS] of Integer;
aiElapsedDestroy: array [1..MAX_THREADS] of Integer;
aiFares: array [1..MAX_THREADS] of Integer;
swWatchT, swWatchP: TStopwatch;
constructor TWorkerThread.Create (AUid: Integer);
begin
inherited Create (True);
_iUid := AUid;
_swWatch := TStopwatch.Create;
_sFileJSonData := ExtractFilePath (ParamStr (0)) + 'json' + Trim (IntToStr (_iUid)) + '.dat';
_lRunning := False;
Suspended := False;
end;
procedure TWorkerThread.Execute;
var
j: Integer;
sLine: String;
slLines: TStringList;
oS: TConverter;
begin
_lRunning := True;
oS := TConverter.Create;
slLines := TStringList.Create;
System.AssignFile (_fJsonData, _sFileJSonData);
System.Reset (_fJsonData);
j := 0;
repeat
System.Readln (_fJsonData, sLine);
slLines.Add (sLine);
Inc (j);
until (j = 50);
// until (System.Eof (_fJsonData));
System.Close (_fJsonData);
Sleep (1000);
_swWatch.Reset;
_swWatch.Start;
aiFares [_iUid] := 0;
aiElapsedParse [_iUid] := 0;
aiElapsedDestroy [_iUid] := 0;
for j := 1 to slLines.Count do
aiFares [_iUid] := aiFares [_iUid] + oS.Calculate (_iUid, slLines.Strings [j - 1], aiElapsedParse [_iUid], aiElapsedDestroy [_iUid]);
_swWatch.Stop;
slLines.Free;
os.Destroy;
aiElapsed [_iUid] := _swWatch.ElapsedMilliseconds;
_lRunning := False;
end;
constructor TConverter.Create;
begin
inherited Create;
_swWatch0 := TStopwatch.Create;
_swWatch1 := TStopwatch.Create;
_swWatch2 := TStopwatch.Create;
end;
destructor TConverter.Destroy;
begin
inherited;
end;
function TConverter.Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
var
jFare, jTotalFares, iElapsedParse, iElapsedDestroy, iElapsedTotal: Integer;
begin
_swWatch0.Reset;
_swWatch0.Start;
_swWatch1.Reset;
_swWatch1.Start;
_dwsjvData := TdwsJSONValue.ParseString (AJSonData);
_swWatch1.Stop;
iElapsedParse := _swWatch1.ElapsedMilliseconds;
if (_dwsjvData.ValueType = jvtArray) then
begin
_swWatch2.Reset;
_swWatch2.Start;
jTotalFares := _dwsjvData.ElementCount;
for jFare := 0 to (jTotalFares - 1) do
if (_dwsjvData.Elements [jFare].ValueType = jvtObject) then
begin
_swWatch1.Reset;
_swWatch1.Start;
_swWatch1.Stop;
end;
end;
_swWatch1.Reset;
_swWatch1.Start;
_dwsjvData.Destroy;
_swWatch1.Stop;
iElapsedDestroy := _swWatch1.ElapsedMilliseconds;
_swWatch0.Stop;
iElapsedTotal := _swWatch0.ElapsedMilliseconds;
Inc (AParse, iElapsedParse);
Inc (ADestroy, iElapsedDestroy);
result := jTotalFares;
end;
procedure MultithreadStart;
var
j: Integer;
begin
for j := 1 to iHowMany do
if (athWorker [j] = nil) then
begin
athWorker [j] := TWorkerThread.Create (j);
TWorkerThread (athWorker [j]).FreeOnTerminate := False;
TWorkerThread (athWorker [j]).Priority := tpNormal;
end;
end;
procedure MultithreadStop;
var
j: Integer;
begin
for j := 1 to MAX_THREADS do
if (athWorker [j] <> nil) then
begin
TWorkerThread (athWorker [j]).Terminate;
TWorkerThread (athWorker [j]).WaitFor;
TWorkerThread (athWorker [j]).Free;
athWorker [j] := nil;
end;
end;
procedure Prologue;
var
j: Integer;
begin
iHowMany := StrToInt (ParamStr (1));
for j := 1 to MAX_THREADS do
athWorker [j] := nil;
swWatchT := TStopwatch.Create;
swWatchT.Reset;
swWatchP := TStopwatch.Create;
swWatchP.Reset;
end;
procedure RunConvert;
function __IsRunning: Boolean;
var
j: Integer;
begin
result := False;
for j := 1 to MAX_THREADS do
result := result or ((athWorker [j] <> nil) and TWorkerThread (athWorker [j]).Running);
end;
begin
swWatchT.Start;
MultithreadStart;
Sleep (1000);
while (__isRunning) do
Sleep (500);
MultithreadStop;
swWatchT.Stop;
Writeln (#13#10, 'Total time:', swWatchT.ElapsedMilliseconds);
end;
procedure Epilogue;
var
j: Integer;
begin
for j := 1 to iHowMany do
Writeln ( #13#10, 'Thread # ', j, ' tot.time:', aiElapsed [j], ' fares:', aiFares [j], ' tot.parse:', aiElapsedParse [j], ' tot.destroy:', aiElapsedDestroy [j]);
Readln;
end;
begin
try
Prologue;
RunConvert;
Epilogue;
except
on E: Exception do
Writeln (E.ClassName, ': ', E.Message);
end;
end.