1

ブローカーからのメッセージを消費して処理するスレッドがいくつかあります。<itemId>WI354DE48</itemId>各メッセージは、「処理」するアイテムの一意の ID として機能する英数字要素を含む XMLです。私が制御または変更できない基準により、アイテム/メッセージが、これらのスレッドが消費しているブローカ キューで複製される可能性があります。したがって、同じアイテム (ID が WI354DE48) がキューに 1 回だけ送信されるか、100 回送信される可能性があります。いずれにせよ、アイテムの処理は一度しか許可できません。そのため、スレッド B が既に処理した重複アイテムをスレッド A が処理しないようにする方法が必要です。

すべてのスレッド (ワーカー) で共有できる単純なスレッド セーフ リストを使用して、キャッシュ メカニズムとして機能することを検討しています。各スレッドには、 の同じインスタンスが与えられますList<String>itemId各ワーカー スレッドがメッセージを消費するとき、 (文字列) がリストに存在するかどうかを確認します。そうでない場合は、他のワーカーがアイテムを処理していません。この場合、itemIDがリストに追加され (ロック/キャッシュ)、アイテムが処理されます。itemIdがリストに既に存在する場合は、別のワーカーが既にそのアイテムを処理しているため、無視できます。シンプルですが、効果的です。

明らかに、スレッドセーフなリスト実装を持つことが最も重要です。このリストで呼び出すメソッドは次の 2 つだけです。

  • List#contains(String)- リストのトラバース/検索
  • List#add(String)- リストの変更

...そして、ほぼ同じ頻度で両方のメソッドを呼び出すことに注意することが重要です。めったにcontains()返されず、IDtrueが必要になることはありません。add

最初はそれCopyOnWriteArrayListが私の最善の策だと思っていましたが、Javadocs を読んだ後、各ワーカーはリストの独自のスレッド ローカル コピーを作成してしまうようです。これは私が望んでいるものではありません。次に を調べたCollections.synchronizedList(new ArrayList<String>)ところ、それはまともな賭けのようです。

List<String> processingCache = Collection.synchronizedList(new ArrayList<String>());
List<Worker> workers = getWorkers(processingCache); // Inject the same list into all workers.
for(Worker worker : workers)
    executor.submit(worker);

// Inside each Worker's run method:
@Override
public void run() {
    String itemXML = consumeItemFromBroker();
    Item item = toItem(itemXML);

    if(processingCache.contains(item.getId())
        return;
    else
        processingCache.add(item.getId());

    ... continue processing.
}

私は順調Collections.synchronizedList(new ArrayList<String>)に進んでいますか、それともベースから外れていますか? List私のユースケースを考えると、より効率的なスレッドセーフな実装はありますか?もしそうなら、それはなぜですか?

4

1 に答える 1

1

Collections.synchronizedListは非常に基本的なもので、すべてのメソッドを としてマークするだけですsynchronized

これは機能しますが、いくつかの特定の仮定の下でのみ機能します。Listつまり、.

if(!list.contains(x))
    list.add(x);

2 つの呼び出しの間にモニターが解放されるため、スレッドセーフではありません。

また、すべてのスレッドが排他ロックを取得するため、読み取りが多く書き込みが少ない場合は、多少遅くなる可能性があります。

java.util.concurrentパッケージ内の実装を見ることができます。いくつかのオプションがあります。

ConcurrentHashMapダミー値でa を使用することをお勧めします。

推奨される理由は、ConcurrentHashMapがキー グループを同期しているため、優れたハッシュ アルゴリズムを使用している (String実際に使用している) 場合は、実際に大量の同時スループットを取得できるためです。

ConcurrentSkipListSet順序付けが保証されないため、そのオーバーヘッドが失われるため、 a よりもこれをお勧めします。

もちろん、スレッド化では、ボトルネックがどこにあるかが完全に明白になることはありません。そのため、両方を試して、どちらがパフォーマンスを向上させるかを確認することをお勧めします。

于 2013-11-08T17:31:14.077 に答える