0

簡単に言えば、循環参照を含む大きなグラフを並列に処理したいのです。また、完全なグラフにアクセスできないため、クロールする必要があります。そして、それを行うための効果的なキューを編成したいと考えています。それを行うためのベストプラクティスはありますか?

私はそのような戦略のために無限のデータ処理フローを編成しようとしています: 各スレッドはキューから処理するノードを取得し、処理後にそれを処理します - 処理のためのいくつかの新しいノードが現れるかもしれません - そのためスレッドはそれらをキューに入れる必要があります. しかし、各ノードを複数回処理する必要はありません。ノードは不変のエンティティです。

私が理解しているように、キューとセットのスレッドセーフな実装を使用する必要があります(すでにアクセスしたインスタンスの場合)。

同期されたメソッドを回避しようとしています。したがって、このフローの私の実装:

  1. スレッドがキューにノードを追加するとき、各ノードをチェックします。visited-nodes-set にこのノードが含まれている場合、スレッドはそれをキューに追加しません。しかし、それだけではありません

  2. スレッドがキューからノードを取得すると、visited-nodes-set にこのノードが含まれているかどうかがチェックされます。含まれている場合、スレッドは、まだ処理されていないノードを取得するまで、キューから別のノードを取得します。未処理のノードを見つけた後、スレッドもそれを訪問ノードセットに追加します。

LinkedBlockingQueue と ConcurrentHashMap を (セットとして) 使用しようとしました。メソッド putIfAbsent(key, value) が含まれているため、ConcurrentHashMap を使用しました。

記述されたアルゴリズムの実装は次のとおりです。

public class ParallelDataQueue {

   private LinkedBlockingQueue<String> dataToProcess = new LinkedBlockingQueue<String>();
   // using map as a set
   private ConcurrentHashMap<String, Object> processedData = new ConcurrentHashMap<String, Object>( 1000000 );
   private final Object value = new Object();

   public String getNextDataInstance() {
    while ( true ) {
        try {
            String data = this.dataToProcess.take();
            Boolean dataIsAlreadyProcessed = ( this.processedData.putIfAbsent( data, this.value ) != null );
            if ( dataIsAlreadyProcessed ) {
                continue;
            } else {
                return data;
            }
        } catch ( InterruptedException e ) {
            e.printStackTrace();
        }
      }
    }

    public void addData( Collection<String> data ) {
    for ( String d : data ) {
        if ( !this.processedData.containsKey( d ) ) {
            try {
                this.dataToProcess.put( d );
            } catch ( InterruptedException e ) {
                e.printStackTrace();
            }
        }
       }
     }

}

だから私の質問 - 現在の実装は反復可能なノードの処理を回避しますか? そして、もっとエレガントな解決策があるのではないでしょうか?

ありがとう

PS

そのような実装は、キュー内のノードの重複の出現を回避しないことを理解しています。しかし、私にとっては重要ではありません。必要なのは、各ノードを複数回処理しないようにすることだけです。

4

3 に答える 3

0

マルチスレッド方式でデータを処理する必要がある場合は、コレクションはまったく必要ないかもしれません。Executorsフレームワークの使用について考えませんでしたか?:

public static void main(String[] args) throws InterruptedException {
    ExecutorService exec = Executors.newFixedThreadPool(100);
    while (true) { // provide data ininitely
        for (int i = 0; i < 1000; i++)
            exec.execute(new DataProcessor(UUID.randomUUID(), exec));
        Thread.sleep(10000); // wait a bit, then continue;
    }
}

static class DataProcessor implements Runnable {
    Object data;
    ExecutorService exec;
    public DataProcessor(Object data, ExecutorService exec) {
        this.data = data;
        this.exec = exec;
    }
    @Override
    public void run() {
        System.out.println(data); // process data
        if (new Random().nextInt(100) < 50) // add new data piece for execution if needed
            exec.execute(new DataProcessor(UUID.randomUUID(), exec));
    }

}
于 2012-06-30T10:46:04.543 に答える
0

はい。使用ConcurrentLinkedQueue( http://docs.oracle.com/javase/1.5.0/docs/api/java/util/concurrent/ConcurrentLinkedQueue.html )

また

スレッドがデータをキューに追加するとき、データの各インスタンスをチェックします。セットにこのデータのインスタンスが含まれている場合、スレッドはそれをキューに追加しません。しかし、それだけではありません

基になる Collection がスレッドセーフでない限り、スレッドセーフなアプローチではありません。(つまり、内部で同期されていることを意味します)しかし、すでにスレッドセーフであるため、チェックを行うのは無意味です...

于 2012-06-30T10:05:06.740 に答える
0

現在の実装では、データ インスタンスの繰り返しが避けられません。「スレッド A」が並行マップにデータが存在するかどうかを確認し、データが存在しないことを報告するとします。しかし、putIfAbsent 行の後の if を実行する直前に、「スレッド A」が中断されます。その際、別の脅威「スレッド B」が CPU によって実行される予定で、同じデータ要素の存在をチェックし、存在しないことを検出して存在しないことを報告し、キューに追加されます。スレッド A が再スケジュールされると、if 行から続行され、再びキューに追加されます。

于 2012-06-30T10:02:14.987 に答える