43

TThreadedQueue (Generics.Collections) を単一のプロデューサーの複数のコンシューマー スキームで使用しようとしています。(Delphi-XE)。アイデアは、オブジェクトをキューにプッシュし、いくつかのワーカー スレッドがキューを排出できるようにすることです。

ただし、期待どおりには機能しません。2 つ以上のワーカー スレッドが PopItem を呼び出している場合、TThreadedQueue からアクセス違反がスローされます。

PopItem への呼び出しがクリティカル セクションでシリアル化されている場合は、すべて問題ありません。

確かに TThreadedQueue は複数のコンシューマーを処理できるはずなので、何かが足りないのでしょうか、それとも TThreadedQueue の純粋なバグですか?

エラーを生成する簡単な例を次に示します。

program TestThreadedQueue;

{$APPTYPE CONSOLE}

uses
//  FastMM4 in '..\..\..\FastMM4\FastMM4.pas',
  Windows,
  Messages,
  Classes,
  SysUtils,
  SyncObjs,
  Generics.Collections;

type TThreadTaskMsg =
       class(TObject)
         private
           threadID  : integer;
           threadMsg : string;
         public
           Constructor Create( ID : integer; const msg : string);
       end;

type TThreadReader =
       class(TThread)
         private
           fPopQueue   : TThreadedQueue<TObject>;
           fSync       : TCriticalSection;
           fMsg        : TThreadTaskMsg;
           fException  : Exception;
           procedure DoSync;
           procedure DoHandleException;
         public
           Constructor Create( popQueue : TThreadedQueue<TObject>;
                               sync     : TCriticalSection);
           procedure Execute; override;
       end;

Constructor TThreadReader.Create( popQueue : TThreadedQueue<TObject>;
                                  sync     : TCriticalSection);
begin
  fPopQueue:=            popQueue;
  fMsg:=                 nil;
  fSync:=                sync;
  Self.FreeOnTerminate:= FALSE;
  fException:=           nil;

  Inherited Create( FALSE);
end;

procedure TThreadReader.DoSync ;
begin
  WriteLn(fMsg.threadMsg + ' ' + IntToStr(fMsg.threadId));
end;

procedure TThreadReader.DoHandleException;
begin
  WriteLn('Exception ->' + fException.Message);
end;

procedure TThreadReader.Execute;
var signal : TWaitResult;
begin
  NameThreadForDebugging('QueuePop worker');
  while not Terminated do
  begin
    try
      {- Calling PopItem can return empty without waittime !? Let other threads in by sleeping. }
      Sleep(20);
      {- Serializing calls to PopItem works }
      if Assigned(fSync) then fSync.Enter;
      try
        signal:= fPopQueue.PopItem( TObject(fMsg));
      finally
        if Assigned(fSync) then fSync.Release;
      end;
      if (signal = wrSignaled) then
      begin
        try
          if Assigned(fMsg) then
          begin
            fMsg.threadMsg:= '<Thread id :' +IntToStr( Self.threadId) + '>';
            fMsg.Free; // We are just dumping the message in this test
            //Synchronize( Self.DoSync);
            //PostMessage( fParentForm.Handle,WM_TestQueue_Message,Cardinal(fMsg),0);
          end;
        except
          on E:Exception do begin
          end;
        end;
      end;
      except
       FException:= Exception(ExceptObject);
      try
        if not (FException is EAbort) then
        begin
          {Synchronize(} DoHandleException; //);
        end;
      finally
        FException:= nil;
      end;
   end;
  end;
end;

Constructor TThreadTaskMsg.Create( ID : Integer; Const msg : string);
begin
  Inherited Create;

  threadID:= ID;
  threadMsg:= msg;
end;

var
    fSync : TCriticalSection;
    fThreadQueue : TThreadedQueue<TObject>;
    fReaderArr : array[1..4] of TThreadReader;
    i : integer;

begin
  try
    IsMultiThread:= TRUE;

    fSync:=        TCriticalSection.Create;
    fThreadQueue:= TThreadedQueue<TObject>.Create(1024,1,100);
    try
      {- Calling without fSync throws exceptions when two or more threads calls PopItem
         at the same time }
      WriteLn('Creating worker threads ...');
      for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,Nil);
      {- Calling with fSync works ! }
      //for i:= 1 to 4 do fReaderArr[i]:= TThreadReader.Create( fThreadQueue,fSync);
       WriteLn('Init done. Pushing items ...');

      for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

      ReadLn;

    finally
      for i:= 1 to 4 do fReaderArr[i].Free;
      fThreadQueue.Free;
      fSync.Free;
    end;

  except
    on E: Exception do
      begin
        Writeln(E.ClassName, ': ', E.Message);
        ReadLn;
      end;
  end;
end.

更新: TThreadedQueue のクラッシュの原因となった TMonitor のエラーは、Delphi XE2 で修正されました。

更新 2 : 上記のテストでは、空の状態でキューにストレスがかかりました。Darian Miller さんは、満杯の状態でキューに負荷をかけても、XE2 でエラーを再現できることを発見しました。エラーは TMonitor に再び表示されます。詳細については、以下の彼の回答を参照してください。また、QC101114 へのリンクもあります。

アップデート 3 : Delphi-XE2 アップデート 4TMonitorでは、 の問題を解決する修正プログラムが発表されましたTThreadedQueue。これまでの私のテストでは、エラーを再現できなくなりましたTThreadedQueue。キューが空でいっぱいのときに、単一のプロデューサー/複数のコンシューマー スレッドをテストしました。複数のプロデューサー/複数のコンシューマーもテストしました。リーダー スレッドとライター スレッドを 1 から 100 まで変化させましたが、問題はありませんでした。しかし、歴史を知っているので、あえて他の人を壊しTMonitorます。

4

5 に答える 5

19

まあ、多くのテストをしないと断定するのは難しいですが、これは確かに TThreadedQueue または TMonitor のバグのようです。どちらにしても、コードではなくRTLにあります。これを QC レポートとして提出し、上記の例を「再現方法」コードとして使用する必要があります。

于 2011-01-31T23:45:11.947 に答える
10

スレッドや並列処理などを扱う場合は、OmniThreadLibrary http://www.thedelphigeek.com/search/label/OmniThreadLibraryを使用することをお勧めします。Primozは非常に優れた仕事をしており、このサイトには役立つドキュメントがたくさんあります。 .

于 2011-02-01T08:22:48.310 に答える
4

あなたの例はXE2でうまく機能しているようですが、キューをいっぱいにすると、PushItemのAVで失敗します。(XE2 Update1でテスト済み)

再現するには、タスクの作成を100から1100に増やします(キューの深さは1024に設定されています)

for i:= 1 to 1100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));

これは、Windows 7で毎回死にます。最初は、ストレステストを行うために継続的なプッシュを試みましたが、ループ30で失敗し、次にループ16で失敗し、次に65で失敗しましたが、いくつかの間隔で一貫して失敗しました。点。

  iLoop := 0;
  while iLoop < 1000 do
  begin
    Inc(iLoop);
    WriteLn('Loop: ' + IntToStr(iLoop));  
    for i:= 1 to 100 do fThreadQueue.PushItem( TThreadTaskMsg.Create( i,''));
  end;
于 2011-11-17T22:08:36.523 に答える
3

TThreadedQueue クラスを探しましたが、私の D2009 にはないようです。私はこれについて正確に自殺するつもりはありません-Delphiスレッドのサポートは常にエラーでした..エラー...「最適化されていない」ので、TThreadedQueueも同じだと思います:)

PC (プロデューサー/コンシューマー) オブジェクトにジェネリックを使用する理由 単純な TObjectQueue の子孫は問題なく動作します - これを何十年も使用しています - 複数のプロデューサー/コンシューマーで問題なく動作します:

unit MinimalSemaphorePCqueue;

{ Absolutely minimal P-C queue based on TobjectQueue and a semaphore.

The semaphore count reflects the queue count
'push' will always succeed unless memory runs out, then you're stuft anyway.
'pop' has a timeout parameter as well as the address of where any received
object is to be put.
'pop' returns immediately with 'true' if there is an object on the queue
available for it.
'pop' blocks the caller if the queue is empty and the timeout is not 0.
'pop' returns false if the timeout is exceeded before an object is available
from the queue.
'pop' returns true if an object is available from the queue before the timeout
is exceeded.
If multiple threads have called 'pop' and are blocked because the queue is
empty, a single 'push' will make only one of the waiting threads ready.


Methods to push/pop from the queue
A 'semaHandle' property that can be used in a 'waitForMultipleObjects' call.
When the handle is signaled, the 'peek' method will retrieve the queued object.
}
interface

uses
  Windows, Messages, SysUtils, Classes,syncObjs,contnrs;


type

pObject=^Tobject;


TsemaphoreMailbox=class(TobjectQueue)
private
  countSema:Thandle;
protected
  access:TcriticalSection;
public
  property semaHandle:Thandle read countSema;
  constructor create; virtual;
  procedure push(aObject:Tobject); virtual;
  function pop(pResObject:pObject;timeout:DWORD):boolean;  virtual;
  function peek(pResObject:pObject):boolean;  virtual;
  destructor destroy; override;
end;


implementation

{ TsemaphoreMailbox }

constructor TsemaphoreMailbox.create;
begin
{$IFDEF D2009}
   inherited Create;
{$ELSE}
  inherited create;
{$ENDIF}
  access:=TcriticalSection.create;
  countSema:=createSemaphore(nil,0,maxInt,nil);
end;

destructor TsemaphoreMailbox.destroy;
begin
  access.free;
  closeHandle(countSema);
  inherited;
end;

function TsemaphoreMailbox.pop(pResObject: pObject;
  timeout: DWORD): boolean;
// dequeues an object, if one is available on the queue.  If the queue is empty,
// the caller is blocked until either an object is pushed on or the timeout
// period expires
begin // wait for a unit from the semaphore
  result:=(WAIT_OBJECT_0=waitForSingleObject(countSema,timeout));
  if result then // if a unit was supplied before the timeout,
  begin
    access.acquire;
    try
      pResObject^:=inherited pop; // get an object from the queue
    finally
      access.release;
    end;
  end;
end;

procedure TsemaphoreMailbox.push(aObject: Tobject);
// pushes an object onto the queue.  If threads are waiting in a 'pop' call,
// one of them is made ready.
begin
  access.acquire;
  try
    inherited push(aObject); // shove the object onto the queue
  finally
    access.release;
  end;
  releaseSemaphore(countSema,1,nil); // release one unit to semaphore
end;

function TsemaphoreMailbox.peek(pResObject: pObject): boolean;
begin
  access.acquire;
  try
    result:=(count>0);
    if result then pResObject^:=inherited pop; // get an object from the queue
  finally
    access.release;
  end;
end;
end.
于 2011-05-17T20:31:02.813 に答える
1

TThreadedQueueが複数のコンシューマーをサポートすることは想定されていないと思います。ヘルプファイルによると、これはFIFOです。私は、1つのスレッドがプッシュされ、もう1つのスレッド(1つだけ!)がポップしているという印象を受けています。

于 2011-02-01T13:27:45.970 に答える