3

さまざまなスレッドをキューに入れることができるキューがあるので、次の2つのことを保証できます。

  1. リクエストは1つずつ処理されます。
  2. リクエストは到着順に処理されます

2点目は重要です。それ以外の場合は、単純なクリティカルセクションで十分です。私にはさまざまなリクエストのグループがあり、単一のグループ内でのみこれらのポイントを満たさなければなりません。異なるグループからのリクエストは同時に実行できます。

次のようになります。

FTaskQueue.Enqueu('MyGroup');
try
  Do Something (running in context of some thread)
finally
  FTaskQueue.Dequeu('MyGroup');
end;

編集:解決したい問題が隠されているため、実際の実装を削除しました

これが必要なのは、httpリクエストを受け入れるIndyベースのWebサーバーがあるからです。まず、リクエストに対応するセッションを見つけます。次に、そのセッションに対して要求(コード)が実行されます。同じセッションに対して複数のリクエストを取得でき(最初のリクエストがまだ処理されている間に新しいリクエストを取得できることを読んでください)、それらは到着の正しい順序で1つずつ実行する必要があります。そのため、このような状況で使用できる汎用同期キューを探して、リクエストをキューに入れることができるようにします。私はスレッドを制御できず、各リクエストは異なるスレッドで実行される可能性があります。

この種の問題に対する最良の(通常の)アプローチは何ですか?問題は、正しい順序が維持されるように、エンキューとデキューはアトミック操作でなければならないことです。私の現在の実装にはかなりのボトルネックがありますが、機能します。

編集:以下はアトミックエンキュー/デキュー操作の問題です

あなたは通常このようなことをします:

procedure Enqueue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoEnqueue;
  finally 
    LeaveCriticalSection(FCritSec);
  end;

  BlockTheCurrentThread; // here the thread blocks itself
end;

procedure Dequeue;
begin
  EnterCriticalSection(FCritSec);
  try
    DoDequeue;
    UnblockTheNextThread; // here the thread unblocks another thread
  finally 
    LeaveCriticalSection(FCritSec);
  end;
end;

ここでの問題は、これがアトミックではないということです。1つのスレッドがすでにキューにあり、別のスレッドが来てEnqueueを呼び出す場合、2番目のスレッドがクリティカルセクションを離れて自分自身をブロックしようとする可能性があります。これで、スレッドスケジューラは最初のスレッドを再開し、次の(2番目の)スレッドのブロックを解除しようとします。ただし、2番目のスレッドはまだブロックされていないため、何も起こりません。これで、2番目のスレッドが続行され、それ自体がブロックされますが、ブロックが解除されないため、これは正しくありません。ブロッキングがクリティカルセクション内にある場合、そのクリティカルセクションが離れることはなく、デッドロックが発生します。

4

3 に答える 3

9

別のアプローチ:

各リクエストスレッドに、最初は設定されていない手動リセットイベントを持たせます。キュー・マネージャーは、そのようなイベントのスレッドセーフなリストを維持する単純なオブジェクトです。Enqueue()Dequeue()メソッドはどちらも、リクエストスレッドのイベントをパラメータとして受け取ります。

type
  TRequestManager = class(TObject)
  strict private
    fCritSect: TCriticalSection;
    fEvents: TList<TEvent>;
  public
    constructor Create;
    destructor Destroy; override;

    procedure Enqueue(ARequestEvent: TEvent);
    procedure Dequeue(ARequestEvent: TEvent);
  end;

{ TRequestManager }

constructor TRequestManager.Create;
begin
  inherited Create;
  fCritSect := TCriticalSection.Create;
  fEvents := TList<TEvent>.Create;
end;

destructor TRequestManager.Destroy;
begin
  Assert((fEvents = nil) or (fEvents.Count = 0));
  FreeAndNil(fEvents);
  FreeAndNil(fCritSect);
  inherited;
end;

procedure TRequestManager.Dequeue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(fEvents.Count > 0);
    Assert(fEvents[0] = ARequestEvent);
    fEvents.Delete(0);
    if fEvents.Count > 0 then
      fEvents[0].SetEvent;
  finally
    fCritSect.Release;
  end;
end;

procedure TRequestManager.Enqueue(ARequestEvent: TEvent);
begin
  fCritSect.Enter;
  try
    Assert(ARequestEvent <> nil);
    if fEvents.Count = 0 then
      ARequestEvent.SetEvent
    else
      ARequestEvent.ResetEvent;
    fEvents.Add(ARequestEvent);
  finally
    fCritSect.Release;
  end;
end;

各リクエストスレッドはEnqueue()キューマネージャーを呼び出し、その後、独自のイベントが通知されるのを待ちます。次に、リクエストを処理して呼び出しますDequeue()

{ TRequestThread }

type
  TRequestThread = class(TThread)
  strict private
    fEvent: TEvent;
    fManager: TRequestManager;
  protected
    procedure Execute; override;
  public
    constructor Create(AManager: TRequestManager);
  end;

constructor TRequestThread.Create(AManager: TRequestManager);
begin
  Assert(AManager <> nil);
  inherited Create(TRUE);
  fEvent := TEvent.Create(nil, TRUE, FALSE, '');
  fManager := AManager;
  Resume;
end;

procedure TRequestThread.Execute;
begin
  fManager.Enqueue(fEvent);
  try
    fEvent.WaitFor(INFINITE);
    OutputDebugString('Processing request');
    Sleep(1000);
    OutputDebugString('Request processed');
  finally
    fManager.Dequeue(fEvent);
  end;
end;

{ TForm1 }

procedure TForm1.Button1Click(Sender: TObject);
var
  i: integer;
begin
  for i := 1 to 10 do
    TRequestThread.Create(fRequestManager);
end;

キュー・マネージャーは、内Enqueue()と内の両方のイベントのリストをロックしDequeue()ます。リストが空の場合Enqueue()はパラメータにイベントを設定し、そうでない場合はイベントをリセットします。次に、イベントをリストに追加します。したがって、最初のスレッドはリクエストを続行でき、他のすべてのスレッドはブロックされます。イベントがリストの一番上から削除され、次Dequeue()のイベントが設定されます(存在する場合)。

そうすれば、最後のリクエストスレッドにより、スレッドを一時停止または再開することなく、次のリクエストスレッドのブロックが完全に解除されます。このソリューションでは、追加のスレッドやウィンドウも必要ありません。必要なのは、リクエストスレッドごとに1つのイベントオブジェクトだけです。

于 2009-12-07T13:20:50.587 に答える
2

コメントからの追加情報を考慮してお答えします。

シリアル化する必要のあるスレッドが多数ある場合は、Windowsが無料で提供しているシリアル化メカニズムを利用できます。各キューを、独自のウィンドウと標準のメッセージループを持つスレッドとします。SendMessage()の代わりにを使用するとPostThreadMessage()、Windowsは、メッセージが処理されるまで送信スレッドをブロックし、正しい実行順序が維持されるようにします。リクエストグループごとに独自のウィンドウを持つスレッドを使用することで、複数のグループが引き続き同時に処理されるようにします。

これは、リクエスト自体が元のスレッドコンテキストとは異なるスレッドコンテキストで処理できる場合にのみ機能する単純なソリューションであり、多くの場合、問題にはなりません。

于 2009-12-07T12:41:38.000 に答える
0

Delphiが提供するTThreadListオブジェクトを試しましたか?

スレッドセーフであり、ロックを管理します。リストは、メインスレッド内のスレッドの「外側」で管理します。

リクエストが新しいタスクを要求したら、それをリストに追加します。スレッドが終了したら、OnTerminateイベントを使用して、リスト内の次のスレッドを呼び出すことができます。

于 2009-12-08T15:49:21.773 に答える