-3

何年も前に古いフォーラムで、私は Primozh に、パイプライン パターンが Uroboros のようなもので、半分完全な結果をそれ自体にフィードバックできるかどうか尋ねました。

当時 Primozh は、それは単純明快で、PipeLine ステージは OmniValues を OUTPUT だけでなく INPUT にもフィードできると述べていました。

問題は、最初のフィード段階があまりにも速く実行され、期限が切れて INPUT コレクションを封印し、それを UN-SEAL する方法がないことです。- OTL が「完了したコレクションに追加できません」という例外をスローします。

では、上記のリンクによるこの自己爆発タスクは、自己給電Pipelineパターンを介してどのように実装できますか?

UPD: 例を「自己爆発」から変更しました - 膨大な量の中間の半分計算された結果を生成する - 順列の生成から、階乗の単純な (希望する) 計算に変更しました。ただし、これには決定論の欠点があります。常に 1 つの中間ジョブ アイテムが生成されるため、増加するコレクションを処理するパイプラインの機能は試されませんでした。

{$A+} // not $A8
type FactTask = record
  Goal, Curr: Cardinal;
  Value : Int64;
end;

procedure TForm6.Button1Click(Sender: TObject);
var Msg: string;
    f: FactTask;
    Results: TArray<Int64>;
    pipeOut: IOmniBlockingCollection;
    pipe:    IOmniPipeline;
begin
  lblResults.Caption := ' WAIT, we are producing...';
  Repaint;

  pipe := Parallel.Pipeline;
  f.Goal := edLen.Value; // 10
  f.Curr := 0;
  f.Value := 1;

  pipe.Stage(
     procedure ( const input, output: IOmniBlockingCollection )
     begin
       output.Add( TOmniValue.FromRecord( f ) );
     end
  );

  pipe.Stage(
     procedure ( const input, output: IOmniBlockingCollection )
     var f_in, f_out: FactTask; v: TOmniValue;
     begin
       for v in input do begin
         f_in := v.ToRecord<FactTask>;
         if f_in.Curr < f_in.Goal then begin
            f_out.Goal := f_in.Goal;
            f_out.Curr := Succ(f_in.Curr);
            f_out.Value := f_in.Value * f_out.Curr;
            input.Add( TOmniValue.FromRecord( f_out ) ); //  <<< Exception!
         end;
       end;
     end
  );

  pipe.Stage(
     procedure ( const input, output: IOmniBlockingCollection )
     var f_in: FactTask;  v: TOmniValue;
     begin
       for v in input do begin
         f_in := v.ToRecord<FactTask>;
         if f_in.Curr = f_in.Goal then begin
            Output.Add( f_in.Value );
         end;
       end;
     end
  );

  pipe.Run;
  pipeOut := pipe.Output;
//    pipe.WaitFor(INFINITE);  ToArray would efficiently do that
//    pipeOut.CompleteAdding;    ...without frozing on Pipeline/Collections SetThrottle
  Results := TOmniBlockingCollection.ToArray<Int64>(pipeOut);

  Msg := IntToStr(f.Goal) + '! = ' + IntToStr(Results[0]);
  lblResults.Caption := Msg;
  ShowMessage(Msg);
end;

によって予期せず封印された入力を再入力しようとするパイプライン ステージでクラッシュしTOmniPipeline.Runます。マークされた行で、「競合するコレクションに追加できません」という例外が予期せずスローされます。

コレクションが空と少数の間でバランスを取っているときにパイプラインを実行し続ける方法 (それは開始条件であるだけでなく、計算終了近くで繰り返されます)?

少し夢を見る: https://plus.google.com/+AriochThe/posts/LCHnSCmZYtx
もう少し: https://github.com/gabr42/OmniThreadLibrary/issues/61

4

1 に答える 1

2

デモ プログラムは正常に動作します。

最初のステージは、その出力に 1 つのレコードを出力するだけです (ちなみに、メイン スレッドで に書き込むことでそれを行うことができますpipe.Input)。その後、出力パイプを閉じます。

これにより、2 番目のステージがシャットダウンされます。終了する前for v in inputに、第 2 ステージは通常、書き込みを試みinputます (タイミングが本当にラッキーでない限り)。ただしinput、すでに閉じられておりAdd、例外が発生します。TryAddの代わりに呼び出すと、Addそれが修正されます。

それPipelineは実際にはあなたが探している抽象化ではなく、何か他のものを使用したほうがよいと思います。TOmniBlockingCollection(ステージ 2 の場合) a をラップする通常の低レベルのタスクを使用します。このブロッキング コレクションを作成してCreate(1)、それ自体が供給され、自動的にブロック解除されることを認識できるようにする必要があります。(詳細については、書籍のツリー内の並列検索の例を参照してください。)

于 2016-01-25T08:48:22.780 に答える