0

知らせ:

元の投稿タイトル

DWScript のマルチスレッド JSON パーサーがスレッド数に応じて拡張されないのはなぜですか?

この問題はDWScript を使用した JSON データの処理に関連していないため、変更されました。問題は、Delphi XE2 から XE7 (テストは XE2 と試用版 XE7 ) のデフォルトのメモリ マネージャーにありますが、このようなアプリケーションで最初に問題が発生しました。


Delphi XE2 で JSON データを処理するマルチスレッドの Win32/Win64 vcl アプリケーションがあります。

各スレッドは、DWScript を使用して JSON データを解析しTdwsJSONValue.ParseString(sJSON)、DWScript メソッドを使用して値を読み取り、結果をレコードとして保存します。

テスト目的で、各スレッドで同じ JSON データを処理します。

単一スレッドの実行ではN、スレッド内でデータを処理するのに数秒かかります。スレッド数を線形にM増やすと (約M * N)、単一スレッド内で同じデータを処理するのに必要な時間が増加します。

その結果、速度の向上はありません。このアプリケーションの他の部分 (JSON データ配信、ターゲット環境への結果の保存) - 期待どおりにスケーリングします。

理由は何ですか?どんなアイデアでも大歓迎です。

補足情報:

  1. Win7/32 および Win7/64、Win8/64 の 2 コアから 12 コア (HT なし) システムでテスト済み

  2. DWScript が利用可能な最速のものとして選択されました (その中には Superobject、組み込みの Delphi などをテストしました)。SO は、DWS の JSON ユニットと同様に動作します。

  3. 以下は、問題を示す完全なコンソール アプリです。これを実行するには、 https ://www.dropbox.com/s/4iuv87ytpcdugk6/json1.zip?dl=0 で入手できるサンプル json データが必要です。このファイルにはjson1.dat、最初のスレッドのデータが含まれています。スレッド数が 16 までの場合は、json1.dat を json2.dat...json16.dat にコピーするだけです。

    プログラムとデータは同じフォルダにある必要があります。実行するには: convert.exe N (N はスレッド数)。

    プログラムは実行時間 (ミリ秒単位) を stout に書き込みます - スレッドで費やされた時間、データの解析時間、TdwsJSONValue オブジェクトの解放 (破棄) 時間。ステートメント_dwsjvData.Destroy;はスケーリングしません。


program Convert;

{$APPTYPE CONSOLE}

{$R *.res}

uses
  System.SysUtils,
  System.Diagnostics,
  System.Classes,
  dwsJSON in 'dwsJSON.pas',
  dwsStrings in 'dwsStrings.pas',
  dwsUtils in 'dwsUtils.pas',
  dwsXPlatform in 'dwsXPlatform.pas';

type

  TWorkerThread = class (TThread)
  private
    _iUid:  Integer;
    _swWatch:  TStopwatch;
    _lRunning:  Boolean;

    _sFileJSonData:  String;
    _fJsonData:  TextFile;

  protected
    constructor Create (AUid: Integer);
    procedure Execute; override;

  published
    property Running: Boolean read _lRunning;

  end;

  TConverter = class (TObject)
  private
    _swWatch0, _swWatch1, _swWatch2:  TStopwatch;

    _dwsjvData:  TdwsJSONValue;

  protected
    constructor Create;
    destructor Destroy; override;

    function Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
  end;

const
  MAX_THREADS = 16;

var
  iHowMany:  Integer;
  athWorker:  array [1..MAX_THREADS] of Pointer;
  aiElapsed:  array [1..MAX_THREADS] of Integer;
  aiElapsedParse:  array [1..MAX_THREADS] of Integer;
  aiElapsedDestroy:  array [1..MAX_THREADS] of Integer;
  aiFares:  array [1..MAX_THREADS] of Integer;
  swWatchT, swWatchP:  TStopwatch;


constructor TWorkerThread.Create (AUid: Integer);
begin
  inherited Create (True);

  _iUid := AUid;
  _swWatch := TStopwatch.Create;
  _sFileJSonData := ExtractFilePath (ParamStr (0)) + 'json' + Trim (IntToStr (_iUid)) + '.dat';

  _lRunning := False;

  Suspended := False;
end;

procedure TWorkerThread.Execute;
var
  j:  Integer;
  sLine:  String;
  slLines:  TStringList;

  oS:  TConverter;
begin
  _lRunning := True;

  oS := TConverter.Create;

  slLines := TStringList.Create;
  System.AssignFile (_fJsonData, _sFileJSonData);
  System.Reset (_fJsonData);
  j := 0;
  repeat
    System.Readln (_fJsonData, sLine);
    slLines.Add (sLine);
    Inc (j);
  until (j = 50);
//  until (System.Eof (_fJsonData));
  System.Close (_fJsonData);

  Sleep (1000);

  _swWatch.Reset;
  _swWatch.Start;

  aiFares [_iUid] := 0;
  aiElapsedParse [_iUid] := 0;
  aiElapsedDestroy [_iUid] := 0;
  for j := 1 to slLines.Count do
    aiFares [_iUid] := aiFares [_iUid] + oS.Calculate (_iUid, slLines.Strings [j - 1], aiElapsedParse [_iUid], aiElapsedDestroy [_iUid]);

  _swWatch.Stop;

  slLines.Free;
  os.Destroy;

  aiElapsed [_iUid] := _swWatch.ElapsedMilliseconds;

  _lRunning := False;
end;

constructor TConverter.Create;
begin
  inherited Create;

  _swWatch0 := TStopwatch.Create;
  _swWatch1 := TStopwatch.Create;
  _swWatch2 := TStopwatch.Create;
end;

destructor TConverter.Destroy;
begin
  inherited;
end;

function TConverter.Calculate (AUid: Integer; AJSonData: String; var AParse, ADestroy: Integer): Integer;
var
  jFare, jTotalFares, iElapsedParse, iElapsedDestroy, iElapsedTotal:  Integer;
begin
  _swWatch0.Reset;
  _swWatch0.Start;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData := TdwsJSONValue.ParseString (AJSonData);
  _swWatch1.Stop;
  iElapsedParse := _swWatch1.ElapsedMilliseconds;

  if (_dwsjvData.ValueType = jvtArray) then
  begin
    _swWatch2.Reset;
    _swWatch2.Start;

    jTotalFares := _dwsjvData.ElementCount;
    for jFare := 0 to (jTotalFares - 1) do
      if (_dwsjvData.Elements [jFare].ValueType = jvtObject) then
      begin

        _swWatch1.Reset;
        _swWatch1.Start;

        _swWatch1.Stop;
      end;
  end;

  _swWatch1.Reset;
  _swWatch1.Start;
  _dwsjvData.Destroy;
  _swWatch1.Stop;
  iElapsedDestroy := _swWatch1.ElapsedMilliseconds;

  _swWatch0.Stop;
  iElapsedTotal := _swWatch0.ElapsedMilliseconds;

  Inc (AParse, iElapsedParse);
  Inc (ADestroy, iElapsedDestroy);

  result := jTotalFares;
end;

procedure MultithreadStart;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    if (athWorker [j] = nil) then
    begin
      athWorker [j] := TWorkerThread.Create (j);

      TWorkerThread (athWorker [j]).FreeOnTerminate := False;
      TWorkerThread (athWorker [j]).Priority := tpNormal;
    end;
end;

procedure MultithreadStop;
var
  j:  Integer;
begin
  for j := 1 to MAX_THREADS do
    if (athWorker [j] <> nil) then
    begin
      TWorkerThread (athWorker [j]).Terminate;
      TWorkerThread (athWorker [j]).WaitFor;

      TWorkerThread (athWorker [j]).Free;
      athWorker [j] := nil;
    end;
end;

procedure Prologue;
var
  j:  Integer;
begin
  iHowMany := StrToInt (ParamStr (1));

  for j := 1 to MAX_THREADS do
    athWorker [j] := nil;

  swWatchT := TStopwatch.Create;
  swWatchT.Reset;

  swWatchP := TStopwatch.Create;
  swWatchP.Reset;
end;

procedure RunConvert;

  function __IsRunning: Boolean;
  var
    j:  Integer;
  begin
    result := False;
    for j := 1 to MAX_THREADS do
      result := result or ((athWorker [j] <> nil) and TWorkerThread (athWorker [j]).Running);
  end;

begin

  swWatchT.Start;

  MultithreadStart;

  Sleep (1000);
  while (__isRunning) do
    Sleep (500);

  MultithreadStop;

  swWatchT.Stop;
  Writeln (#13#10, 'Total time:', swWatchT.ElapsedMilliseconds);
end;

procedure Epilogue;
var
  j:  Integer;
begin
  for j := 1 to iHowMany do
    Writeln ( #13#10, 'Thread # ', j, '  tot.time:', aiElapsed [j], '  fares:', aiFares [j], '  tot.parse:', aiElapsedParse [j], '  tot.destroy:', aiElapsedDestroy [j]);

  Readln;
end;

begin
  try
    Prologue;
    RunConvert;
    Epilogue;

  except
    on E: Exception do
      Writeln (E.ClassName, ': ', E.Message);
  end;
end.
4

3 に答える 3

1

解決策は、デフォルトの Delphi XE2 または XE7 メモリ マネージャを Intel® Threading Building Blocks メモリ マネージャに交換することです。サンプル アプリケーションでは、約 1 倍にスケーリングされます。アプリが 64 ビットの場合、最大 16 のスレッド数を持つ線形。

update: with assumption that number of threads running is less than number of cores

これは、124GB RAM を搭載した KVM 仮想化 Windows 7 を実行する 2 コア/4ht から 12 コア/24ht のマシンでテストされました。

興味深いのは、Win 7 の仮想化です。メモリの割り当てと割り当て解除は、ネイティブの Win 7 の 2 倍の速さです。

結論: マルチスレッド (4 ~ 8 スレッド以上) アプリケーションのスレッドで 10kB ~ 10MB ブロックのメモリ割り当て/割り当て解除操作を頻繁に行う場合は、Intel のメモリ マネージャのみを使用してください。

@André: 正しい方向を示してくれたヒントに感謝します!

これは、テスト用に TBB メモリ マネージャーを使用したユニットです。メイン プロジェクト ファイル .dpr のユニット リストの 1 番目に表示される必要があります。

unit TBBMem;

interface

function  ScalableGetMem  (ASize: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_malloc';
procedure ScalableFreeMem (APtr: Pointer); cdecl; external 'tbbmalloc' name 'scalable_free';
function  ScalableReAlloc (APtr: Pointer; Size: NativeInt): Pointer; cdecl; external 'tbbmalloc' name 'scalable_realloc';

implementation

Function TBBGetMem (ASize: Integer): Pointer;
begin
  result := ScalableGetMem (ASize);
end;

Function TBBFreeMem (APtr: Pointer): Integer;
begin
  ScalableFreeMem (APtr);
  result := 0;
end;

Function TBBReAllocMem (APtr: Pointer; ASize: Integer): Pointer;
begin
  result := ScalableRealloc (APtr, ASize);
end;

const
  TBBMemoryManager:  TMemoryManager = ( GetMem: TBBGetmem;
                                        FreeMem: TBBFreeMem;
                                        ReAllocMem:  TBBReAllocMem; );
var
  oldMemoryManager:  TMemoryManager;

initialization
  GetMemoryManager (oldMemoryManager);
  SetMemoryManager (TBBMemoryManager);

finalization
  SetMemoryManager (oldMemoryManager);

end.
于 2014-11-18T14:59:44.353 に答える
1

私は FastCode MM Challenge の (再) テストを行いましたが、結果は TBB ではあまり良くありませんでした (ブロック ダウンサイズ テストでもメモリ不足の例外が発生しました)。

要するに、この複雑なテストでは、ScaleMM2 と Google TCmalloc が最も高速であり、Fastmm と ScaleMM2 が最も少ないメモリを使用します。

Average Speed Performance: (Scaled so that the winner = 100%)
  XE6         :   70,4
  TCmalloc    :   89,1
  ScaleMem2   :  100,0
  TBBMem      :   77,8

Average Memory Performance: (Scaled so that the winner = 100%)
  XE6         :  100,0
  TCmalloc    :   29,6
  ScaleMem2   :   75,6
  TBBMem      :   38,4

FastCode チャレンジ: https://code.google.com/p/scalemm/source/browse/#svn%2Ftrunk%2FChallenge
TBB 4.3: https://www.threadingbuildingblocks.org/download

于 2014-11-22T20:44:21.670 に答える