2

私には 3 つの段階からなる一連の責任があります。

ステージ

ステージの私の実装は次のようになります。

public class InitialStage implements RecordHandler {

    private RecordHandler next;

    @Override
    public void setNext(RecordHandler handler) {
        if (this.next == null) {
            this.next = handler;
        } else {
            this.next.setNext(handler);
        }
    }

    @Override
    public void handleRequest(String record) {
        System.out.println("Processing @ Initial Stage.");
        if (next != null) {
            next.handleRequest(record);
        }
    }
}

プロセスマネージャー

public class ProcessManager {
    private RecordProcessor processor = new RecordProcessor();

    public ProcessManager()
    {
        createPipeLine();
    }

    private void createPipeLine() {
        processor = new RecordProcessor();
        processor.addHandler(new InitialStage());
        processor.addHandler(new MiddleStage());
        processor.addHandler(new FinalStage());
    }

    public void processRecord(String record){
        processor.handleRequest(record);
    }
}

インターフェイス

public interface RecordHandler {
    public void setNext(RecordHandler handler);
    public void handleRequest(String record);
}

そして最後に RecordProcessor

public class RecordProcessor {

    private RecordHandler successor;
    private RecordHandler first;

    public RecordProcessor(){
    }

    public void addHandler(RecordHandler recordHandler){
        if(this.first == null){
            this.first = recordHandler;
        }
        else{
            this.successor.setNext(recordHandler);  
        }
        this.successor = recordHandler;
    }

    public void handleRequest(String record){
        first.handleRequest(record);
    }
}

ここで、ミドル ステージが完了するまでに時間がかかることがわかりました。そこで、スレッド プールの使用に興味があるので、レコードは以下に示す方法で処理されます。

ここでは、3 つのワーカー スレッドを想定しています。

推奨チェーン

質問

新しい要件を満たすために既存の CoR を正しく変更するにはどうすればよいですか?

4

2 に答える 2

1

実行可能な解決策を見つけたので、コミュニティの利益のために自分の質問に答えます。

この問題に対して、少し異なる解決策を開発しました。並列パイプラインを使用する代わりに、マルチスレッド ステージを使用しました。(この場合は中間段階)。つまり、個々のレコードを渡す代わりに、中間ステージは入力キューと出力キューを使用します。入力キューが定義された数のレコードで満たされると、スレッドが生成されます。

後続のステージは、キューがいっぱいになるまでアイドル状態になる可能性がありますが、それでもこの実装は受け入れられます。これは、シーケンシングの必要がない場合に使用できます。シーケンスに従う必要がある場合は、シーケンス ID を保持し、出力キューでソートを実行する必要があります。

中間段階

public class MiddleStage implements RecordHandler {

private RecordHandler next;
private ExecutorService executor = Executors.newFixedThreadPool(5);
private Queue<String> inbound = new LinkedList<String>();
Collection<Callable<String>> tasks = new ArrayList<Callable<String>>();

@Override
public void setNext(RecordHandler handler) {
    if (this.next == null) {
        this.next = handler;
    } else {
        this.next.setNext(handler);
    }
}

@Override
public void handleRequest(String record) {
    System.out.println("Adding new record to Queue : " + record);
    inbound.add(record);
    System.out.println("Queue Size : " + inbound.size());

    if (inbound.size() >= 10)
    {
        System.out.println("Processing the batch.");

        for (int i = 0; i < 10; i++){
            tasks.add(new MiddleWorker(inbound.poll()));
        }

        System.out.println("Processing @ Middle Stage. " + record);
        List <Future<String>> results = null;
        try {
            results = executor.invokeAll(tasks, 60, TimeUnit.SECONDS);
            tasks.clear();
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        for (Future<String> f : results) {
              try {
                String answer = f.get();
                System.out.println("Outbound : " + answer);
                if (next != null) {
                    next.handleRequest(answer);
                }

            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            }
        results.clear();
    }
}
}

ミドルワーカー クラス

public class MiddleWorker implements Callable<String> {

private String output;

public MiddleWorker(String record)
{
    output = record + " : OK";
}

@Override
public String call() throws Exception {
    try
    {
        Thread.sleep(2 * 1000);
    }
    catch(final InterruptedException ex)
    {
        ex.printStackTrace();
    }

    return (output);
}
}

この回答が不明な場合はコメントしてください。回答を修正します。

于 2013-10-03T07:47:43.633 に答える
0

ワークロードをスレッドプールに渡す役割を持つ追加の「ステージ」を作成できます。

public class PoolExecutingStage implements RecordHandler {

    private Executor threadPool;
    private RecordHandler next;

    public PoolExecutingStage(Executor tp) {
         this.threadPool = tp;      
    }

    @Override
    public void setNext(RecordHandler handler) {
           // similar to your example
    }

    @Override
    public void handleRequest(String record) {
        if (next != null) {
            threadPool.execute(new Runnable() {
                 public void run() {
                     next.handleRequest(record);
                 }
            });               
        }
    }
}

processor.addHandler(new PoolExecutingStage(configuredThreadPool));次に、初期ステージと中間ステージの間に呼び出しを追加して、この新しいステージをパイプラインに含める必要があります。


また、これを複数のスレッドで実行するときに必要になる可能性のあるスレッド同期を処理することにも注意してください

于 2013-07-29T16:56:13.940 に答える