IOCP を使用してスレッド プールを作成する方法を示すDelphi3000の記事を次に示します。私はこのコードの作成者ではありませんが、作成者の情報はソース コードにあります。
ここにコメントとコードを再投稿しています:
誰もがスレッドとは何か、スレッドの原理などを理解しているはずです。スレッドの単純な機能は、あるスレッドから別のスレッドに処理を分離し、同時実行と並列実行を可能にすることです。スレッドの主な原則は単純で、スレッド間で参照される割り当てられたメモリは、アクセスの安全性を確保するためにマーシャリングする必要があります。他にも多くの原則がありますが、これは本当に気にするべきものです。
そして、..
スレッド セーフ キューを使用すると、複数のスレッドが First on First off ベースで安全にキューに値を追加および削除、プッシュおよびポップできます。効率的で適切に作成されたキューを使用すると、スレッド セーフ ロギングの支援からリクエストの非同期処理まで、スレッド化されたアプリケーションの開発に非常に役立つコンポーネントを使用できます。
スレッドプールは、リクエストのキューを管理するために最も一般的に使用されるスレッドまたは複数のスレッドです。たとえば、処理が必要な要求の連続キューを持つ Web サーバーは、スレッド プールを使用して http 要求を管理するか、COM+ または DCOM サーバーはスレッド プールを使用して rpc 要求を処理します。これは、1 つの要求の処理が別の要求に与える影響を少なくするために行われます。たとえば、3 つの要求を同期的に実行し、最初の要求が完了するまでに 1 分かかった場合、2 番目の 2 つの要求は少なくとも 1 分間は完了しません。ほとんどのクライアントにとって、これは受け入れられません。
それで、これを行う方法..
行列からスタート!!
Delphi は利用可能な TQueue オブジェクトを提供しますが、残念ながらスレッド セーフではなく、実際にはあまり効率的でもありませんが、Contnrs.pas ファイルを調べて、ボーランドがそこにスタックとキューを書き込む方法を確認する必要があります。キューに必要な主な機能は、追加と削除/プッシュとポップの 2 つだけです。Add/push は、値、ポインター、またはオブジェクトをキューの最後に追加します。また、remove/pop はキューの最初の値を削除して返します。
TQueue オブジェクトから派生させて、保護されたメソッドをオーバーライドし、クリティカル セクションを追加することもできます。これにより、ある程度の方法が得られますが、新しいリクエストがキューに入るまでキューを待機させ、スレッドを次の状態にする必要があります。新しいリクエストを待つ間、休息します。これは、ミューテックスまたはシグナル イベントを追加することで実行できますが、もっと簡単な方法があります。Windows API は、キューへのスレッド セーフなアクセスを提供する IO 完了キューと、キューで新しい要求を待機している間の休息状態を提供します。
スレッド プールの実装
スレッド プールは非常に単純なものになり、必要な数のスレッドを管理し、各キュー要求を処理対象のイベントに渡します。TThread クラスを実装し、ロジックを実装してクラスの実行イベント内にカプセル化する必要はほとんどないため、別のスレッドのコンテキスト内で任意のオブジェクトの任意のメソッドを実行する単純な TSimpleThread クラスを作成できます。人々がこれを理解したら、あとは割り当てられたメモリだけを気にする必要があります。
実装方法は次のとおりです。
TThreadQueue と TThreadPool の実装
(* Implemented for Delphi3000.com Articles, 11/01/2004
Chris Baldwin
Director & Chief Architect
Alive Technology Limited
http://www.alivetechnology.com
*)
unit ThreadUtilities;
uses Windows, SysUtils, Classes;
type
EThreadStackFinalized = class(Exception);
TSimpleThread = class;
// Thread Safe Pointer Queue
TThreadQueue = class
private
FFinalized: Boolean;
FIOQueue: THandle;
public
constructor Create;
destructor Destroy; override;
procedure Finalize;
procedure Push(Data: Pointer);
function Pop(var Data: Pointer): Boolean;
property Finalized: Boolean read FFinalized;
end;
TThreadExecuteEvent = procedure (Thread: TThread) of object;
TSimpleThread = class(TThread)
private
FExecuteEvent: TThreadExecuteEvent;
protected
procedure Execute(); override;
public
constructor Create(CreateSuspended: Boolean; ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
end;
TThreadPoolEvent = procedure (Data: Pointer; AThread: TThread) of Object;
TThreadPool = class(TObject)
private
FThreads: TList;
FThreadQueue: TThreadQueue;
FHandlePoolEvent: TThreadPoolEvent;
procedure DoHandleThreadExecute(Thread: TThread);
public
constructor Create( HandlePoolEvent: TThreadPoolEvent; MaxThreads: Integer = 1); virtual;
destructor Destroy; override;
procedure Add(const Data: Pointer);
end;
implementation
{ TThreadQueue }
constructor TThreadQueue.Create;
begin
//-- Create IO Completion Queue
FIOQueue := CreateIOCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
FFinalized := False;
end;
destructor TThreadQueue.Destroy;
begin
//-- Destroy Completion Queue
if (FIOQueue <> 0) then
CloseHandle(FIOQueue);
inherited;
end;
procedure TThreadQueue.Finalize;
begin
//-- Post a finialize pointer on to the queue
PostQueuedCompletionStatus(FIOQueue, 0, 0, Pointer($FFFFFFFF));
FFinalized := True;
end;
(* Pop will return false if the queue is completed *)
function TThreadQueue.Pop(var Data: Pointer): Boolean;
var
A: Cardinal;
OL: POverLapped;
begin
Result := True;
if (not FFinalized) then
//-- Remove/Pop the first pointer from the queue or wait
GetQueuedCompletionStatus(FIOQueue, A, Cardinal(Data), OL, INFINITE);
//-- Check if we have finalized the queue for completion
if FFinalized or (OL = Pointer($FFFFFFFF)) then begin
Data := nil;
Result := False;
Finalize;
end;
end;
procedure TThreadQueue.Push(Data: Pointer);
begin
if FFinalized then
Raise EThreadStackFinalized.Create('Stack is finalized');
//-- Add/Push a pointer on to the end of the queue
PostQueuedCompletionStatus(FIOQueue, 0, Cardinal(Data), nil);
end;
{ TSimpleThread }
constructor TSimpleThread.Create(CreateSuspended: Boolean;
ExecuteEvent: TThreadExecuteEvent; AFreeOnTerminate: Boolean);
begin
FreeOnTerminate := AFreeOnTerminate;
FExecuteEvent := ExecuteEvent;
inherited Create(CreateSuspended);
end;
procedure TSimpleThread.Execute;
begin
if Assigned(FExecuteEvent) then
FExecuteEvent(Self);
end;
{ TThreadPool }
procedure TThreadPool.Add(const Data: Pointer);
begin
FThreadQueue.Push(Data);
end;
constructor TThreadPool.Create(HandlePoolEvent: TThreadPoolEvent;
MaxThreads: Integer);
begin
FHandlePoolEvent := HandlePoolEvent;
FThreadQueue := TThreadQueue.Create;
FThreads := TList.Create;
while FThreads.Count < MaxThreads do
FThreads.Add(TSimpleThread.Create(False, DoHandleThreadExecute, False));
end;
destructor TThreadPool.Destroy;
var
t: Integer;
begin
FThreadQueue.Finalize;
for t := 0 to FThreads.Count-1 do
TThread(FThreads[t]).Terminate;
while (FThreads.Count > 0) do begin
TThread(FThreads[0]).WaitFor;
TThread(FThreads[0]).Free;
FThreads.Delete(0);
end;
FThreadQueue.Free;
FThreads.Free;
inherited;
end;
procedure TThreadPool.DoHandleThreadExecute(Thread: TThread);
var
Data: Pointer;
begin
while FThreadQueue.Pop(Data) and (not TSimpleThread(Thread).Terminated) do begin
try
FHandlePoolEvent(Data, Thread);
except
end;
end;
end;
end.
ご覧のとおり、非常に簡単です。これにより、スレッドを介したリクエストのキューイングを非常に簡単に実装でき、スレッド化を必要とするあらゆるタイプの要件をこれらのオブジェクトを使用して実行できるため、多くの時間と労力を節約できます。
これを使用して、1 つのスレッドから複数のスレッドへの要求をキューに入れたり、複数のスレッドから 1 つのスレッドへの要求をキューに入れたりすることができ、これは非常に優れたソリューションになります。
これらのオブジェクトの使用例をいくつか示します。
スレッド セーフ ロギング
複数のスレッドがログ ファイルに非同期的に書き込むことができるようにします。
uses Windows, ThreadUtilities,...;
type
PLogRequest = ^TLogRequest;
TLogRequest = record
LogText: String;
end;
TThreadFileLog = class(TObject)
private
FFileName: String;
FThreadPool: TThreadPool;
procedure HandleLogRequest(Data: Pointer; AThread: TThread);
public
constructor Create(const FileName: string);
destructor Destroy; override;
procedure Log(const LogText: string);
end;
implementation
(* Simple reuse of a logtofile function for example *)
procedure LogToFile(const FileName, LogString: String);
var
F: TextFile;
begin
AssignFile(F, FileName);
if not FileExists(FileName) then
Rewrite(F)
else
Append(F);
try
Writeln(F, DateTimeToStr(Now) + ': ' + LogString);
finally
CloseFile(F);
end;
end;
constructor TThreadFileLog.Create(const FileName: string);
begin
FFileName := FileName;
//-- Pool of one thread to handle queue of logs
FThreadPool := TThreadPool.Create(HandleLogRequest, 1);
end;
destructor TThreadFileLog.Destroy;
begin
FThreadPool.Free;
inherited;
end;
procedure TThreadFileLog.HandleLogRequest(Data: Pointer; AThread: TThread);
var
Request: PLogRequest;
begin
Request := Data;
try
LogToFile(FFileName, Request^.LogText);
finally
Dispose(Request);
end;
end;
procedure TThreadFileLog.Log(const LogText: string);
var
Request: PLogRequest;
begin
New(Request);
Request^.LogText := LogText;
FThreadPool.Add(Request);
end;
これはファイルにログを記録しているため、すべてのリクエストを 1 つのスレッドに処理しますが、より多くのスレッド カウントを使用して豊富な電子メール通知を行うことができます。または、プログラムで何が起こっているか、またはプログラムのステップをプロファイリングすることもできます。この記事はかなり長くなったので、別の記事で。
今のところ、私はこれであなたを残します、楽しんでください..人々が立ち往生しているものがある場合はコメントを残してください.
クリス