現在、ファイルを転送するためのDelphi XE3クライアント/サーバーアプリケーションに取り組んでいます(Indy FTPコンポーネントを使用)。クライアント部分はフォルダを監視し、内部のファイルのリストを取得し、それらをサーバーにアップロードして、元のファイルを削除します。アップロードは、ファイルを1つずつ処理する別のスレッドによって行われます。ファイルの範囲は0から数千で、サイズも大きく異なります。
これはOSXとWindowsの両方用にコンパイルされたFiremonkeyアプリであるため、OmniThreadLibraryの代わりにTThreadを使用する必要がありました。私の顧客は、アプリケーションがランダムにフリーズすると報告しています。複製することはできませんでしたが、TThreadの経験があまりないので、どこかでデッドロック状態になっている可能性があります。私はかなり多くの例を読みましたが、マルチスレッドの詳細のいくつかについてはまだわかりません。
アプリの構造は単純です。
メインスレッドのタイマーがフォルダーをチェックし、各ファイルに関する情報をレコードに取得します。レコードは汎用TListに格納されます。このリストには、ファイルの名前、サイズ、進行状況、ファイルが完全にアップロードされているか、再試行する必要があるかに関する情報が保持されます。プログレスバーなどのグリッドに表示されるすべて。このリストには、メインスレッドからのみアクセスできます。その後、リストのアイテムは、AddFileメソッド(以下のコード)を呼び出すことによってスレッドに送信されます。スレッドはすべてのファイルを次のようなスレッドセーフキューに保存しますhttp://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
ファイルがアップロードされると、アップローダースレッドはメインスレッドに次のように通知します。 Synchronizeを呼び出します。
メインスレッドは定期的にUploader.GetProgressメソッドを呼び出して、現在のファイルの進行状況を確認して表示します。この関数は実際にはスレッドセーフではありませんが、デッドロックが発生したり、間違ったデータのみが返されたりする可能性がありますか?
進捗チェックを行うための安全で効率的な方法は何でしょうか?
それで、このアプローチは大丈夫ですか、それとも私は何かを逃しましたか?これをどのように行いますか?
たとえば、フォルダの内容を読み取るためだけに新しいスレッドを作成することを考えています。つまり、使用するTListはスレッドセーフにする必要がありますが、GUIグリッドに表示される情報を更新するには、常にアクセスする必要があります。すべての同期によってGUIの速度が低下するだけではないでしょうか。
誰かが見たいと思った場合に備えて、以下に簡略化したコードを投稿しました。そうでなければ、私が一般的に何を使うべきかについていくつかの意見を聞いてうれしいです。主な目標は、OSXとWindowsの両方で動作することです。すべてのファイルと現在のファイルの進行状況に関する情報を表示できるようにする。また、ファイルの数やサイズに関係なく応答します。
これがアップローダースレッドのコードです。読みやすくするために、その一部を削除しました。
type
TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
TFileInfo = record
ID: Integer;
Path: String;
Size: Int64;
UploadedSize: Int64;
Status: TFileStatus;
end;
TUploader = class(TThread)
private
FTP: TIdFTP;
fQueue: TThreadedQueue<TFileInfo>;
fCurrentFile: TFileInfo;
FUploading: Boolean;
procedure ConnectFTP;
function UploadFile(aFileInfo: TFileInfo): String;
procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
procedure SignalComplete;
procedure SignalError(aError: String);
protected
procedure Execute; override;
public
property Uploading: Boolean read FUploading;
constructor Create;
destructor Destroy; override;
procedure Terminate;
procedure AddFile(const aFileInfo: TFileInfo);
function GetProgress: TFileInfo;
end;
procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
fQueue.Enqueue(aFileInfo);
end;
procedure TUploader.ConnectFTP;
begin
...
FTP.Connect;
end;
constructor TUploader.Create;
begin
inherited Create(false);
FreeOnTerminate := false;
fQueue := TThreadedQueue<TFileInfo>.Create;
// Create the TIdFTP and set ports and other params
...
end;
destructor TUploader.Destroy;
begin
fQueue.Close;
fQueue.Free;
FTP.Free;
inherited;
end;
// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
Temp: TFileInfo;
begin
try
ConnectFTP;
except
on E: Exception do
SignalError(E.Message);
end;
// Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
while fQueue.Peek(fCurrentFile) = wrSignaled do
try
if UploadFile(fCurrentFile) = '' then
begin
fQueue.Dequeue(Temp); // Delete the item from the queue if succesful
SignalComplete;
end;
except
on E: Exception do
SignalError(E.Message);
end;
end;
// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
Result := fCurrentFile;
end;
// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
fCurrentFile.UploadedSize := AWorkCount;
end;
procedure TUploader.SignalComplete;
begin
Synchronize(
procedure
begin
frmClientMain.OnCompleteFile(fCurrentFile);
end);
end;
procedure TUploader.SignalError(aError: String);
begin
try
FTP.Disconnect;
except
end;
if fQueue.Closed then
Exit;
Synchronize(
procedure
begin
frmClientMain.OnUploadError(aError);
end);
end;
// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
fQueue.Close;
inherited;
end;
function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
Result := 'Error';
try
if not FTP.Connected then
ConnectFTP;
FUploading := true;
FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));
Result := '';
finally
FUploading := false;
end;
end;
そして、アップローダーと相互作用するメインスレッドの部分:
......
// Main form
fUniqueID: Integer; // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
fUploader: TUploader; // The uploader thread
fFiles: TList<TFileInfo>;
fCurrentFileName: String; // Used to display the progress
function IndexOfFile(aID: Integer): Integer; //Return the index of the record inside the fFiles given the file ID
public
procedure OnCompleteFile(aFileInfo: TFileInfo);
procedure OnUploadError(aError: String);
end;
// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
// show and log the error
end;
// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
I: Integer;
begin
I := IndexOfFile(aFileInfo.ID);
if (I >= 0) and (I < fFiles.Count) then
begin
aFileInfo.Status := fsUploaded;
aFileInfo.UploadedSize := aFileInfo.Size;
FFiles.Items[I] := aFileInfo;
Inc(FFilesUploaded);
TFile.Delete(aFileInfo.Path);
colProgressImg.UpdateCell(I);
end;
end;
procedure TfrmClientMain.ProcessFolder;
var
NewFiles: TStringDynArray;
I, J: Integer;
FileInfo: TFileInfo;
begin
// Remove completed files from the list if it contains more than XX files
while FFiles.Count > 1000 do
if FFiles[0].Status = fsUploaded then
begin
Dec(FFilesUploaded);
FFiles.Delete(0);
end else
Break;
NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
for I := 0 to Length(NewFiles) - 1 do
begin
FileInfo.ID := FUniqueID;
Inc(FUniqueID);
FileInfo.Path := NewFiles[I];
FileInfo.Size := GetFileSizeByName(NewFiles[I]);
FileInfo.UploadedSize := 0;
FileInfo.Status := fsToBeQueued;
FFiles.Add(FileInfo);
if (I mod 100) = 0 then
begin
UpdateStatusLabel;
grFiles.RowCount := FFiles.Count;
Application.ProcessMessages;
if fUploader = nil then
break;
end;
end;
// Send the new files and resend failed to the uploader thread
for I := 0 to FFiles.Count - 1 do
if (FFiles[I].Status = fsToBeQueued) then
begin
if fUploader = nil then
Break;
FileInfo := FFiles[I];
FileInfo.Status := fsQueued;
FFiles[I] := FileInfo;
SaveDebug(1, 'Add: ' + ExtractFileName(FFiles[I].Path));
FUploader.AddFile(FFiles[I]);
end;
end;
procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
FileInfo: TFileInfo;
I: Integer;
begin
if (fUploader = nil) or not fUploader.Uploading then
Exit;
FileInfo := fUploader.GetProgress;
I := IndexOfFile(FileInfo.ID);
if (I >= 0) and (I < fFiles.Count) then
begin
fFiles.Items[I] := FileInfo;
fCurrentFileName := ExtractFileName(FileInfo.Path);
colProgressImg.UpdateCell(I);
end;
end;
function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
I: Integer;
begin
Result := -1;
for I := 0 to FFiles.Count - 1 do
if FFiles[I].ID = aID then
Exit(I);
end;