簡単に言えば、循環参照を含む大きなグラフを並列に処理したいのです。また、完全なグラフにアクセスできないため、クロールする必要があります。そして、それを行うための効果的なキューを編成したいと考えています。それを行うためのベストプラクティスはありますか?
私はそのような戦略のために無限のデータ処理フローを編成しようとしています: 各スレッドはキューから処理するノードを取得し、処理後にそれを処理します - 処理のためのいくつかの新しいノードが現れるかもしれません - そのためスレッドはそれらをキューに入れる必要があります. しかし、各ノードを複数回処理する必要はありません。ノードは不変のエンティティです。
私が理解しているように、キューとセットのスレッドセーフな実装を使用する必要があります(すでにアクセスしたインスタンスの場合)。
同期されたメソッドを回避しようとしています。したがって、このフローの私の実装:
スレッドがキューにノードを追加するとき、各ノードをチェックします。visited-nodes-set にこのノードが含まれている場合、スレッドはそれをキューに追加しません。しかし、それだけではありません
スレッドがキューからノードを取得すると、visited-nodes-set にこのノードが含まれているかどうかがチェックされます。含まれている場合、スレッドは、まだ処理されていないノードを取得するまで、キューから別のノードを取得します。未処理のノードを見つけた後、スレッドもそれを訪問ノードセットに追加します。
LinkedBlockingQueue と ConcurrentHashMap を (セットとして) 使用しようとしました。メソッド putIfAbsent(key, value) が含まれているため、ConcurrentHashMap を使用しました。
記述されたアルゴリズムの実装は次のとおりです。
public class ParallelDataQueue {
private LinkedBlockingQueue<String> dataToProcess = new LinkedBlockingQueue<String>();
// using map as a set
private ConcurrentHashMap<String, Object> processedData = new ConcurrentHashMap<String, Object>( 1000000 );
private final Object value = new Object();
public String getNextDataInstance() {
while ( true ) {
try {
String data = this.dataToProcess.take();
Boolean dataIsAlreadyProcessed = ( this.processedData.putIfAbsent( data, this.value ) != null );
if ( dataIsAlreadyProcessed ) {
continue;
} else {
return data;
}
} catch ( InterruptedException e ) {
e.printStackTrace();
}
}
}
public void addData( Collection<String> data ) {
for ( String d : data ) {
if ( !this.processedData.containsKey( d ) ) {
try {
this.dataToProcess.put( d );
} catch ( InterruptedException e ) {
e.printStackTrace();
}
}
}
}
}
だから私の質問 - 現在の実装は反復可能なノードの処理を回避しますか? そして、もっとエレガントな解決策があるのではないでしょうか?
ありがとう
PS
そのような実装は、キュー内のノードの重複の出現を回避しないことを理解しています。しかし、私にとっては重要ではありません。必要なのは、各ノードを複数回処理しないようにすることだけです。