12

次のインターフェイスを実装する配列に基づくオブジェクトがあります。

public interface PairSupplier<Q, E> {
     public int size();

     public Pair<Q, E> get(int index);
}

その上に特定のイテレータを作成したいと思います:

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     //some magic
}

メソッドnextで、 PairSupplierからいくつかの要素を返したいと思います。

この要素はスレッドに対して一意である必要があり、他のスレッドにはこの要素が含まれていてはなりません。

PairSupplier には最終的なサイズがあるため、常にこのような状況になるとは限りませんが、アプローチしたいと考えています。

要素の順序は関係ありません。スレッドは同じ要素をの時点で取得できます。

: 2 Threads, 5 elements-{1,2,3,4,5}

Thread 1  | Thread 2
   1           2
   3           4
   5           1
   3           2
   4           5

私の解決策

次の呼び出しごとにインクリメントする AtomicInteger インデックスを作成します。

PairSupplier pairs;
AtomicInteger index;

public boolean hasNext(){
     return true;
}

public Pair<Q, E> next(){
     int position = index.incrementAndGet() % pairs.size;
     if (position < 0) {
          position *= -1;
          position = pairs.size - position;
     }
     return pairs.get(position);
}

ペアインデックスはすべてのスレッドで共有されます。

このソリューションはスケーラブルではないことがわかりました (すべてのスレッドがインクリメントになるため)。おそらく誰かがより良いアイデアを持っていますか?

この反復子は、50 ~ 1000 のスレッドで使用されます。

4

7 に答える 7

4

Pairすべてのスレッド間で共有する必要がある情報があります (「これを既に取得した人はいますか?」)。したがって、一般的なケースでは、行き詰まっています。ただし、配列のこのサイズとスレッド数についての考えがある場合は、バケットを使用して負担を軽減できます。

1,000,000 個の配列要素と 1,000 個のスレッドがあることがわかっているとします。各スレッドに範囲を割り当てます (スレッド #1 は要素 0 ~ 999 などを取得します)。1,000 のスレッドが 1 つの AtomicInteger をめぐって競合する代わりに、競合がまったく発生しなくなります!

これは、すべてのスレッドがほぼ同じペースで実行されることが確実な場合に機能します。スレッド #2 がアイドル状態で、スレッド #1 が他の処理でビジーな場合がある場合に対処する必要がある場合は、バケット パターンを少し変更できます。各バケットには AtomicInteger があります。通常、スレッドは自分自身とのみ競合しますが、バケットが空の場合、次のバケットに移動できます。

于 2013-10-15T13:53:44.013 に答える
4

あなたの質問の詳細はあいまいです-あなたの例は、2つのスレッドを同じように処理できることを示唆してPairいますが、説明ではそうではありません。

達成するのがより困難であるため、サプライヤーがサイクルするまでスレッドごとに 1 つを提供する を提供しIterable<Pair<Q,E>>ますPair。その後、それが繰り返されます。

public interface Supplier<T> {
  public int size();

  public T get(int index);

}

public interface PairSupplier<Q, E> extends Supplier<Pair<Q, E>> {
}

public class IterableSupplier<T> implements Iterable<T> {
  // The common supplier to use across all threads.
  final Supplier<T> supplier;
  // The atomic counter.
  final AtomicInteger i = new AtomicInteger();

  public IterableSupplier(Supplier<T> supplier) {
    this.supplier = supplier;
  }

  @Override
  public Iterator<T> iterator() {
    /**
     * You may create a NEW iterator for each thread while they all share supplier
     * and Will therefore distribute each Pair between different threads.
     *
     * You may also share the same iterator across multiple threads.
     *
     * No two threads will get the same pair twice unless the sequence cycles.
     */
    return new ThreadSafeIterator();
  }

  private class ThreadSafeIterator implements Iterator<T> {
    @Override
    public boolean hasNext() {
      /**
       * Always true.
       */
      return true;
    }

    private int pickNext() {
      // Just grab one atomically.
      int pick = i.incrementAndGet();
      // Reset to zero if it has exceeded - but no spin, let "just someone" manage it.
      int actual = pick % supplier.size();
      if (pick != actual) {
        // So long as someone has a success before we overflow int we're good.
        i.compareAndSet(pick, actual);
      }
      return actual;
    }

    @Override
    public T next() {
      return supplier.get(pickNext());
    }

    @Override
    public void remove() {
      throw new UnsupportedOperationException("Remove not supported.");
    }

  }

}

注意: 両方のシナリオに対応するために、コードを少し調整しました。Iteratorスレッドごとに取得することも、スレッドIterator全体で単一のものを共有することもできます。

于 2013-10-28T13:04:31.110 に答える
1

あなたが解決しようとしている問題が何であるかを理解するのに苦労していますか?

各スレッドはコレクション全体を処理しますか?

2 つのスレッドが同じペアで同時に動作できないという懸念はありますか? しかし、各スレッドはコレクション内の各ペアを処理する必要がありますか?

それとも、すべてのスレッドを使用してコレクションを 1 回処理しますか?

于 2013-10-25T03:20:13.177 に答える
0

あなたの例ではあいまいな重要なことが1つあります-これは正確にはどういう意味ですか?

要素の順序は関係ありません。スレッドは同じ要素を別の時点で取得できます。

「別の時間」とはどういう意味ですか? 互いに N ミリ秒以内ですか? 絶対に 2 つのスレッドが同じペアに同時に接触することは絶対にないということですか? と仮定します。

スレッドが同じペアをめぐって互いに競合する可能性を減らしたい場合、およびペアのバッキング配列がある場合は、これを試してください。

  • 配列をサブ配列に分割しますnumPairs / threadCount(実際にサブ配列を作成する必要はありません。異なるオフセットから開始するだけです。ただし、サブ配列と考える方が簡単です)。
  • 各スレッドを異なるサブアレイに割り当てます。スレッドがそのサブ配列を使い果たしたとき、そのサブ配列のインデックスをインクリメントします
    • 6 つのペアと 2 つのスレッドがあるとします。割り当ては Thread-1:[0,1,2] Thread-2:[3,4,5] のようになります。スレッド 1 が開始すると、スレッド 2 とは異なるペアのセットが参照されるため、同じペアを求めて競合することはほとんどありません。
  • 2 つのスレッドが同時に Pair に触れないことが重要な場合は、Pair オブジェクトに触れるすべてのコードをラップします(タイプではなく、インスタンスsynchronized(pair)で同期します!) - 時折ブロッキングが発生する可能性がありますが、 - スレッドは、実際には同じオブジェクトに触れようとしているため、互いにブロックすることしかできません。AtomicInteger

これは絶対にブロックしないという保証はないことに注意してください。そのためには、すべてのスレッドがまったく同じ速度で実行される必要があり、すべての Pair オブジェクトの処理にまったく同じ時間がかかる必要があり、OS のスレッド スケジューラが決して盗まない必要があります。あるスレッドからの時間であり、別のスレッドからの時間ではありません。これらのことを想定することはできません。これにより、作業領域を分割し、共有される状態の最小単位をロックにすることで、並行性が向上する可能性が高くなります。

しかし、これはデータ構造の同時実行性を高めるための通常のパターンです。スレッド間でデータを分割して、同時に同じロックに触れることはめったにありません。

于 2013-10-23T08:40:20.957 に答える
0

最も簡単なのは、ハッシュ セットまたはマップを作成し、すべてのスレッドに一意のハッシュを与えることです。その後、このハッシュコードで単純な get を実行するだけです。

于 2013-10-23T13:33:42.687 に答える
0

これは、標準的な Java セマフォの使用に関する問題です。次の javadoc は、問題とほぼ同様の例を示しています。http://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Semaphore.html

さらにサポートが必要な場合はお知らせください。

于 2013-10-25T17:21:52.003 に答える
0

私はロックとリリースのプロセスを好みます。

スレッドがペア オブジェクトを要求している場合、そのペア オブジェクトはサプライヤーから削除されます。スレッドが新しいペアを要求する前に、「古い」ペアがサプライヤーに再度追加されます。

前から押して最後に入れることができます。

于 2013-10-28T14:39:08.773 に答える