4

タプル オブジェクトを並行 Java コレクションに格納し、パターンに一致する最初の要素を返す効率的なブロッキング クエリ メソッドを使用したいと考えています。そのような要素が利用できない場合、そのような要素が存在するまでブロックされます。

たとえば、クラスがある場合:

public class Pair {
  public final String first;
  public final String Second;
  public Pair( String first, String second ) {
    this.first = first;
    this.second = second;
  }
}

そして、次のようなコレクション:

public class FunkyCollection {
  public void add( Pair p ) { /* ... */ }
  public Pair get( Pair p ) { /* ... */ }
}

次のようにクエリしたいと思います。

myFunkyCollection.get( new Pair( null, "foo" ) );

secondこれは、フィールドが「foo」に等しい最初の使用可能なペアを返すか、そのような要素が追加されるまでブロックします。別のクエリの例:

myFunkyCollection.get( new Pair( null, null ) );

値に関係なく、最初に使用可能なペアを返す必要があります。

ソリューションは既に存在しますか? そうでない場合、メソッドを実装するために何を提案しますget( Pair p )か?

明確化:メソッドget( Pair p)は要素も削除する必要があります。名前の選択はあまり賢明ではありませんでした。より適切な名前はtake( ... ).

4

4 に答える 4

3

ここにいくつかのソースコードがあります。基本的には cb160 が言ったことと同じですが、ソース コードがあると、疑問が解決しない場合があります。特に、FunkyCollection のメソッドは同期する必要があります。

メリットンが指摘したように、get メソッドは、新しいオブジェクトが追加されるたびに、ブロックされた get ごとに O(n) スキャンを実行します。また、O(n) 操作を実行してオブジェクトを削除します。これは、最後にチェックした項目への反復子を保持できるリンク リストに似たデータ構造を使用することで改善できます。この最適化のソース コードは提供していませんが、追加のパフォーマンスが必要な場合は実装が難しくありません。

import java.util.*;

public class BlockingQueries
{
    public class Pair
    {
        public final String first;
        public final String second;
        public Pair(String first, String second)
        {
            this.first = first;
            this.second = second;
        }
    }

    public class FunkyCollection
    {
        final ArrayList<Pair> pairs = new ArrayList<Pair>();

        public synchronized void add( Pair p )
        {
            pairs.add(p);
            notifyAll();
        }

        public synchronized Pair get( Pair p ) throws InterruptedException
        {
            while (true)
            {
                for (Iterator<Pair> i = pairs.iterator(); i.hasNext(); )
                {
                    Pair pair = i.next();
                    boolean firstOk = p.first == null || p.first.equals(pair.first);
                    boolean secondOk = p.second == null || p.second.equals(pair.second);
                    if (firstOk && secondOk)
                    {
                        i.remove();
                        return pair;                
                    }
                }
                wait();
            }
        }   
    }

    class Producer implements Runnable
    {
        private FunkyCollection funkyCollection;

        public Producer(FunkyCollection funkyCollection)
        {
            this.funkyCollection = funkyCollection;
        }

        public void run()
        {
            try
            {
                for (int i = 0; i < 10; ++i)
                {
                    System.out.println("Adding item " + i);
                    funkyCollection.add(new Pair("foo" + i, "bar" + i));
                    Thread.sleep(1000);
                }
            }
            catch (InterruptedException e)
            {
                Thread.currentThread().interrupt();
            }
        }
    }

    public void go() throws InterruptedException
    {
        FunkyCollection funkyCollection = new FunkyCollection();
        new Thread(new Producer(funkyCollection)).start();
        System.out.println("Fetching bar5.");
        funkyCollection.get(new Pair(null, "bar5"));
        System.out.println("Fetching foo2.");
        funkyCollection.get(new Pair("foo2", null));
        System.out.println("Fetching foo8, bar8");
        funkyCollection.get(new Pair("foo8", "bar8"));
        System.out.println("Finished.");
    }

    public static void main(String[] args) throws InterruptedException
    {
        new BlockingQueries().go();
    }
}

出力:

Fetching bar5.
Adding item 0
Adding item 1
Adding item 2
Adding item 3
Adding item 4
Adding item 5
Fetching foo2.
Fetching foo8, bar8
Adding item 6
Adding item 7
Adding item 8
Finished.
Adding item 9

実行しやすくするために、すべてを 1 つのソース ファイルにまとめていることに注意してください。

于 2010-01-09T23:22:57.037 に答える
3

この動作を提供する既存のコンテナはありません。直面する問題の 1 つは、クエリに一致する既存のエントリがない場合です。その場合、新しいエントリが到着するまで待つ必要があり、それらの新しいエントリはシーケンスの末尾に到着するはずです。ブロックしていることを考えると、最新の追加に先行するすべてのエントリを調べる必要はありません。なぜなら、既にそれらを調べて、それらが一致しないと判断したからです。したがって、現在の位置を記録し、新しいエントリが到着するたびにそこから前方に検索できる方法が必要です。

この待機は の仕事ですConditioncb160の回答で示唆されているConditionように、コレクション内にインスタンスを割り当て、Condition#await(). get()時限待機を可能にするために、コンパニオン オーバーロードをメソッドに公開する必要もあります。

public Pair get(Pair p) throws InterruptedException;
public Pair get(Pair p, long time, TimeUnit unit) throws InterruptedException;

を呼び出すたびにadd()、 を呼び出して、満たされていないクエリをCondition#signalAll()待機しているスレッドのブロックを解除し、最近の追加をスキャンできるようにします。get()

このコンテナからアイテムを削除する方法または削除するかどうかについては言及していません。コンテナーが成長するだけの場合、コンテナーを変更する他のスレッドからの競合を心配することなく、スレッドがその内容をスキャンする方法が簡素化されます。各スレッドは、検査可能なエントリの最小数に関して自信を持ってクエリを開始できます。ただし、アイテムの削除を許可すると、直面する課題がさらに多くなります。

于 2010-01-09T23:37:34.600 に答える
2

FunkyCollection add メソッドでは、要素を追加するたびにコレクション自体で notifyAll を呼び出すことができます。

get メソッドで、基になるコンテナー (適切なコンテナーであれば問題ありません) に必要な値が含まれていない場合は、FunkyCollection を待ちます。待機が通知されたら、基になるコンテナーに必要な結果が含まれているかどうかを確認します。存在する場合は値を返し、そうでない場合は再度待機します。

于 2010-01-09T23:06:04.040 に答える
1

Tuple Spaces の実装を探しているようです。それらに関するウィキペディアの記事には、Java の実装がいくつかリストされています。おそらく、そのうちの 1 つを使用できます。それができない場合は、模倣するオープン ソースの実装や、関連する研究論文を見つけることができます。

于 2010-01-10T00:17:55.893 に答える