21

現在、ファイルを転送するためのDelphi XE3クライアント/サーバーアプリケーションに取り組んでいます(Indy FTPコンポーネントを使用)。クライアント部分はフォルダを監視し、内部のファイルのリストを取得し、それらをサーバーにアップロードして、元のファイルを削除します。アップロードは、ファイルを1つずつ処理する別のスレッドによって行われます。ファイルの範囲は0から数千で、サイズも大きく異なります。

これはOSXとWindowsの両方用にコンパイルされたFiremonkeyアプリであるため、OmniThreadLibraryの代わりにTThreadを使用する必要がありました。私の顧客は、アプリケーションがランダムにフリーズすると報告しています。複製することはできませんでしたが、TThreadの経験があまりないので、どこかでデッドロック状態になっている可能性があります。私はかなり多くの例を読みましたが、マルチスレッドの詳細のいくつかについてはまだわかりません。

アプリの構造は単純です。
メインスレッドのタイマーがフォルダーをチェックし、各ファイルに関する情報をレコードに取得します。レコードは汎用TListに格納されます。このリストには、ファイルの名前、サイズ、進行状況、ファイルが完全にアップロードされているか、再試行する必要があるかに関する情報が保持されます。プログレスバーなどのグリッドに表示されるすべて。このリストには、メインスレッドからのみアクセスできます。その後、リストのアイテムは、AddFileメソッド(以下のコード)を呼び出すことによってスレッドに送信されます。スレッドはすべてのファイルを次のようなスレッドセーフキューに保存しますhttp://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
ファイルがアップロードされると、アップローダースレッドはメインスレッドに次のように通知します。 Synchronizeを呼び出します。
メインスレッドは定期的にUploader.GetProgressメソッドを呼び出して、現在のファイルの進行状況を確認して表示します。この関数は実際にはスレッドセーフではありませんが、デッドロックが発生したり、間違ったデータのみが返されたりする可能性がありますか?

進捗チェックを行うための安全で効率的な方法は何でしょうか?

それで、このアプローチは大丈夫ですか、それとも私は何かを逃しましたか?これをどのように行いますか?
たとえば、フォルダの内容を読み取るためだけに新しいスレッドを作成することを考えています。つまり、使用するTListはスレッドセーフにする必要がありますが、GUIグリッドに表示される情報を更新するには、常にアクセスする必要があります。すべての同期によってGUIの速度が低下するだけではないでしょうか。

誰かが見たいと思った場合に備えて、以下に簡略化したコードを投稿しました。そうでなければ、私が一般的に何を使うべきかについていくつかの意見を聞いてうれしいです。主な目標は、OSXとWindowsの両方で動作することです。すべてのファイルと現在のファイルの進行状況に関する情報を表示できるようにする。また、ファイルの数やサイズに関係なく応答します。

これがアップローダースレッドのコードです。読みやすくするために、その一部を削除しました。

type
  TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
  TFileInfo = record
    ID: Integer;
    Path: String;
    Size: Int64;
    UploadedSize: Int64;
    Status: TFileStatus;
  end;

  TUploader = class(TThread)
  private
    FTP: TIdFTP;
    fQueue: TThreadedQueue<TFileInfo>;
    fCurrentFile: TFileInfo;
    FUploading: Boolean;
    procedure ConnectFTP;
    function UploadFile(aFileInfo: TFileInfo): String;
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    procedure SignalComplete;
    procedure SignalError(aError: String);
  protected
    procedure Execute; override;
  public
    property Uploading: Boolean read FUploading;
    constructor Create;
    destructor Destroy; override;
    procedure Terminate;
    procedure AddFile(const aFileInfo: TFileInfo);
    function GetProgress: TFileInfo;
  end;

procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
  fQueue.Enqueue(aFileInfo);
end;

procedure TUploader.ConnectFTP;
begin
  ...
    FTP.Connect;
end;

constructor TUploader.Create;
begin
  inherited Create(false);
  FreeOnTerminate := false;
  fQueue := TThreadedQueue<TFileInfo>.Create;
  // Create the TIdFTP and set ports and other params
  ...
end;

destructor TUploader.Destroy;
begin
  fQueue.Close;
  fQueue.Free;
  FTP.Free;
  inherited;
end;

// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
  Temp: TFileInfo;
begin
  try
    ConnectFTP;
  except
    on E: Exception do
      SignalError(E.Message);
  end;

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while fQueue.Peek(fCurrentFile) = wrSignaled do
    try
      if UploadFile(fCurrentFile) = '' then
      begin
        fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
        SignalComplete;
      end;
    except
      on E: Exception do
        SignalError(E.Message);
    end;
end;

// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
  Result := fCurrentFile;
end;

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
  fCurrentFile.UploadedSize := AWorkCount;
end;

procedure TUploader.SignalComplete;
begin
  Synchronize(
    procedure
    begin
      frmClientMain.OnCompleteFile(fCurrentFile);
    end);
end;

procedure TUploader.SignalError(aError: String);
begin
  try
    FTP.Disconnect;
  except
  end;
  if fQueue.Closed then
    Exit;

  Synchronize(
    procedure
    begin
      frmClientMain.OnUploadError(aError);
    end);
end;

// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
  fQueue.Close;
  inherited;
end;

function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
  Result := 'Error';
  try
    if not FTP.Connected then
      ConnectFTP;
    FUploading := true;
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));     
    Result := '';
  finally
    FUploading := false;
  end;
end;

そして、アップローダーと相互作用するメインスレッドの部分:

......
// Main form
    fUniqueID: Integer;  // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
    fUploader: TUploader;         // The uploader thread
    fFiles: TList<TFileInfo>;
    fCurrentFileName: String;     // Used to display the progress
    function IndexOfFile(aID: Integer): Integer;    //Return the index of the record inside the fFiles given the file ID
  public
    procedure OnCompleteFile(aFileInfo: TFileInfo);
    procedure OnUploadError(aError: String);
  end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
  // show and log the error
end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
  I: Integer;
begin
  I := IndexOfFile(aFileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    aFileInfo.Status := fsUploaded;
    aFileInfo.UploadedSize := aFileInfo.Size;
    FFiles.Items[I] := aFileInfo;
    Inc(FFilesUploaded);
    TFile.Delete(aFileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

procedure TfrmClientMain.ProcessFolder;
var
  NewFiles: TStringDynArray;
  I, J: Integer;
  FileInfo: TFileInfo;
begin
    // Remove completed files from the list if it contains more than XX files
    while FFiles.Count > 1000 do
      if FFiles[0].Status = fsUploaded then
      begin
        Dec(FFilesUploaded);
        FFiles.Delete(0);
      end else
        Break;

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
    for I := 0 to Length(NewFiles) - 1 do
    begin
          FileInfo.ID := FUniqueID;
          Inc(FUniqueID);
          FileInfo.Path := NewFiles[I];
          FileInfo.Size := GetFileSizeByName(NewFiles[I]);
          FileInfo.UploadedSize := 0;
          FileInfo.Status := fsToBeQueued;
          FFiles.Add(FileInfo);

      if (I mod 100) = 0 then
      begin
        UpdateStatusLabel;
        grFiles.RowCount := FFiles.Count;
        Application.ProcessMessages;
        if fUploader = nil then
          break;
      end;
    end;

    // Send the new files and resend failed to the uploader thread
    for I := 0 to FFiles.Count - 1 do
      if (FFiles[I].Status = fsToBeQueued) then
      begin
        if fUploader = nil then
          Break;
        FileInfo := FFiles[I];
        FileInfo.Status := fsQueued;
        FFiles[I] := FileInfo;
        SaveDebug(1, 'Add:    ' + ExtractFileName(FFiles[I].Path));
        FUploader.AddFile(FFiles[I]);
      end;
end;

procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
  FileInfo: TFileInfo;
  I: Integer;
begin
  if (fUploader = nil) or not fUploader.Uploading then
    Exit;
  FileInfo := fUploader.GetProgress;
  I := IndexOfFile(FileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    fFiles.Items[I] := FileInfo;
    fCurrentFileName := ExtractFileName(FileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
  I: Integer;
begin
  Result := -1;
  for I := 0 to FFiles.Count - 1 do
    if FFiles[I].ID = aID then
      Exit(I);
end;
4

3 に答える 3

1

これは問題ではないかもしれませんが、TFileInfo はレコードです。

これは、(非 const/var) パラメータとして渡されると、コピーされることを意味します。これにより、レコードがコピーされたときに参照カウントが更新されないレコード内の文字列などの問題が発生する可能性があります。

試してみるべきことの 1 つは、それをクラスにして、パラメーターとしてインスタンスを渡すことです (つまり、ヒープ上のデータへのポインター)。

他に注意すべきことは、スレッド化された 32 ビット システムでの Int64 の共有 (サイズ値など) です。

これらの更新/読み取りはアトミックに行われず、特定の保護がないため、値の読み取りでスレッド化が原因で上位と下位の 32 ビットが一致しない可能性があります。(たとえば、上位 32 ビットの読み取り、上位 32 ビットの書き込み、下位 32 ビットの書き込み、下位 32 ビットの読み取り、異なるスレッドでの読み取りと書き込み)。これはおそらくあなたが目にしている問題の原因ではなく、4GB を超えるファイル転送を行っていない限り、問題が発生する可能性はほとんどありません。

于 2013-03-02T04:06:51.587 に答える