ブローカーからのメッセージを消費して処理するスレッドがいくつかあります。<itemId>WI354DE48</itemId>
各メッセージは、「処理」するアイテムの一意の ID として機能する英数字要素を含む XMLです。私が制御または変更できない基準により、アイテム/メッセージが、これらのスレッドが消費しているブローカ キューで複製される可能性があります。したがって、同じアイテム (ID が WI354DE48) がキューに 1 回だけ送信されるか、100 回送信される可能性があります。いずれにせよ、アイテムの処理は一度しか許可できません。そのため、スレッド B が既に処理した重複アイテムをスレッド A が処理しないようにする方法が必要です。
すべてのスレッド (ワーカー) で共有できる単純なスレッド セーフ リストを使用して、キャッシュ メカニズムとして機能することを検討しています。各スレッドには、 の同じインスタンスが与えられますList<String>
。itemId
各ワーカー スレッドがメッセージを消費するとき、 (文字列) がリストに存在するかどうかを確認します。そうでない場合は、他のワーカーがアイテムを処理していません。この場合、itemID
がリストに追加され (ロック/キャッシュ)、アイテムが処理されます。itemId
がリストに既に存在する場合は、別のワーカーが既にそのアイテムを処理しているため、無視できます。シンプルですが、効果的です。
明らかに、スレッドセーフなリスト実装を持つことが最も重要です。このリストで呼び出すメソッドは次の 2 つだけです。
List#contains(String)
- リストのトラバース/検索List#add(String)
- リストの変更
...そして、ほぼ同じ頻度で両方のメソッドを呼び出すことに注意することが重要です。めったにcontains()
返されず、IDtrue
が必要になることはありません。add
最初はそれCopyOnWriteArrayList
が私の最善の策だと思っていましたが、Javadocs を読んだ後、各ワーカーはリストの独自のスレッド ローカル コピーを作成してしまうようです。これは私が望んでいるものではありません。次に を調べたCollections.synchronizedList(new ArrayList<String>)
ところ、それはまともな賭けのようです。
List<String> processingCache = Collection.synchronizedList(new ArrayList<String>());
List<Worker> workers = getWorkers(processingCache); // Inject the same list into all workers.
for(Worker worker : workers)
executor.submit(worker);
// Inside each Worker's run method:
@Override
public void run() {
String itemXML = consumeItemFromBroker();
Item item = toItem(itemXML);
if(processingCache.contains(item.getId())
return;
else
processingCache.add(item.getId());
... continue processing.
}
私は順調Collections.synchronizedList(new ArrayList<String>)
に進んでいますか、それともベースから外れていますか? List
私のユースケースを考えると、より効率的なスレッドセーフな実装はありますか?もしそうなら、それはなぜですか?