5

最高のスループットで多数の要素と一致する Java 同時実行イディオムを探しています。

複数のスレッドから「人」が入ってくると考えてください。各「人」は一致を探しています。別の待機中の「人」が見つかると、それらは互いに割り当てられ、処理のために削除されます。

状態を変更するために大きな構造をロックしたくありません。Person に getMatch と setMatch があるとします。送信される前は、各人の #getMatch は null です。しかし、それらがブロック解除された (または釣り上げられた) 場合、一致を待ち望んでいたために有効期限が切れているか、#getMatch が null ではないかのいずれかです。

高いスループットを維持する際の問題は、PersonA が PersonB と同時に送信された場合です。それらは互いに一致しますが、PersonB も既に待機している PersonC と一致します。送信されると、PersonB の状態が「利用可能」に変わります。ただし、PersonB が PersonC と照合されている間に、PersonA が誤って PersonB を取得してはなりません。わかる?また、非同期で動作する方法でこれを行いたいと考えています。言い換えれば、各サブミッターが、waitForMatch タイプのスレッドで Person を保持する必要はありません。

繰り返しますが、リクエストを別のスレッドで実行する必要はありませんが、マッチ メーカー スレッドが 1 つ追加されても問題ありません。

これはかなり一般的なことのように見えるので、これにはいくつかのイディオムがあるはずです。しかし、私のグーグル検索はうまくいきませんでした(間違った用語を使用している可能性があります)。

アップデート

この問題を私にとって難しくしていることがいくつかあります。1 つは、オブジェクトをメモリに保持したくないということです。すべての待機中の候補を redis や memcache などに保持したいと考えています。もう 1 つは、どの人にも複数の一致候補がある可能性があるということです。次のようなインターフェースを検討してください。

person.getId();         // lets call this an Integer
person.getFriendIds();  // a collection of other person ids

次に、次のようなサーバーがあります。

MatchServer:
   submit( personId, expiration ) -> void // non-blocking returns immediately
   isDone( personId ) -> boolean          // either expired or found a match
   getMatch( personId ) -> matchId        // also non-blocking

これは残りのインターフェイス用であり、結果が得られるまでリダイレクトを使用します。私が最初に考えたのは、Redis のようなものに支えられ、現在ロックされていて操作されているオブジェクトに対して同時に弱い値のハッシュ マップを持つ MatchServer に Cache を用意することでした。各 personId は、送信済み、一致、期限切れなどの状態を持つ永続的な状態オブジェクトによってラップされます。

ここまでフォロー?非常に単純です。送信コードが最初の作業を行いました。それは次のようなものでした。

public void submit( Person p, long expiration ) {
    MatchStatus incoming = new MatchStatus( p.getId(), expiration );
    if ( !tryMatch( incoming, p.getFriendIds() ) )
        cache.put( p.getId(), incoming ); 
}

public boolean isDone( Integer personId ) {
    MatchStatus status = cache.get( personId );
    status.lock();
    try {
        return status.isMatched() || status.isExpired();

    } finally {
        status.unlock();
    }
}

public boolean tryMatch( MatchStatus incoming, Iterable<Integer> friends ) {
    for ( Integer friend : friends ) {
        if ( match( incoming, friend ) )
            return true;
    }

    return false;
}

private boolean match( MatchStatus incoming, Integer waitingId ) {
    CallStatus waiting = cache.get( waitingId );
    if ( waiting == null )
        return false;

    waiting.lock();
    try {
        if ( waiting.isMatched() )
            return false;

        waiting.setMatch( incoming.getId() );
        incoming.setMatch( waiting.getId() );

        return true
    } finally {
        waiting.unlock();
    }
}

ここでの問題は、2 人が同時に入ってきて、2 人しか一致しない場合、お互いを見つけられないことです。競合状態ですよね?それを解決する唯一の方法は、「tryMatch()」を同期することでした。しかし、それは私のスループットを殺します。これらは非常に短い呼び出しにする必要があるため、tryMatch を無期限にループさせることはできません。

では、これにアプローチするより良い方法は何ですか? 私が思いつくすべてのソリューションは、一度に 1 人ずつ人を強制するものであり、スループットにはあまり適していません。たとえば、バックグラウンド スレッドを作成し、ブロッキング キューを使用して着信を 1 つずつ入れたり受け取ったりします。

ガイダンスをいただければ幸いです。

4

4 に答える 4

1

マッチング システムの詳細についてはまだ明確ではありませんが、一般的なガイダンスを提供できます。

基本的に、アトミックな読み取り-変更-書き込み機能がなければ、プロセスを同期することはできません。データベースからそれを取得する方法については説明しません。簡単なもの (Transaction Isolation を備えた SQL データベース) から不可能なもの (一部の NoSQL データベース) までさまざまだからです。データベースから取得できない場合は、メモリ内で同期を行うしかありません。

第 2 に、一致する 2 人を可用性プールから同時にアトミックに削除できる必要があります。ただし、同じアトミック操作の一部として、それらを相互に割り当てる前に、両方がまだ使用可能であることも確認する必要があります。

第 3 に、スループットを最大化するには、競合状態を防止するのではなく、競合状態を検出するようにシステムを設計し、競合が検出されたときに回復手順を実装します。

上記のすべては、データベースよりもメモリで実行する方がはるかに簡単です (そしてパフォーマンスが向上します)。だから、できれば記憶の中でやります。

  1. 挿入によって順序付けられたインメモリ マッチング プールを作成して、すべてのリクエストがどのリクエストが前に来てどのリクエストが後に来たかを認識できるようにします。(これは、行われたリクエストの順序を反映する必要はありません。プールに挿入された順序である必要があります。)
  2. リクエストが入ります。リクエストはメモリ内マッチング プールに入り、データベースのステータスは「検索中」に変わります。
  3. 要求スレッドは、一致する古い要求をメモリ内プールで検索します。
    1. 見つかった場合は一致です。
    2. 何も見つからない場合、要求スレッドは終了します。
    3. 検索中に新しいリクエストと一致した場合は、検索を停止し、新しいリクエストがプールから削除できるようにします。
  4. 一致すると、新しいリクエストは古いリクエストに検索を停止するよう通知し、両方のリクエストがプールから削除されます。レースが検出された場合、それを検出した人は誰でも、行っていることを停止/元に戻し、新しい情報に従って続行します。競合検出の順序を設計して、この動作が孤立した一致 (デッドロックに相当するもの) にならないようにする必要がありますが、これは完全に実行可能です。
  5. 一致がプールから削除された後、それらのデータベース ステータスが更新されます。
  6. 別のワーカー スレッドがキューを古いものから新しいものへとスキャンし、期限切れの要求を削除して、データベースを新しい状態で更新します。

このシステムでは、ブロッキング同期アクションのみが一致プールに挿入され、一致プールから削除されます。これらは個別のロックです。(リクエスト スレッドは、そのリクエストをマッチ プールから削除する前に、ロックを取得し、それがまだマッチ プールにあるかどうかを確認する必要があります。そうでない場合は、レース リカバリ手順に分岐します。)同期できる量の制限。(わかりました。プールがいっぱいになったときに、プールへの挿入もブロックする必要があると思いますが、他に何ができますか? 新しいプールを作成できれば、既存のプールを拡張できます。)

要求キューを並べ替えて順番に検索することで、要求元スレッドが完全な検索を実行できることが保証されることに注意してください。検索が見つからない場合、唯一の希望は、後の要求が一致し、その一致が後の要求スレッドによって検出されることです。

于 2013-05-15T07:15:16.950 に答える
0

より優れたシンプルなものが提案されるまで、私は非常にシンプルなアプローチを採用しました。BlockingQueue を処理する単一のバックグラウンド スレッド。スループットは高くありませんが、送信者がブロックする必要はありません。また、ウェイターの永続キャッシュで同期を必要としないという利点もあります。BlockingQueue を永続化された BlockingQueue に簡単に変更できます。送信者は、キューがいっぱいになった場合にのみ待機する必要があります。

唯一の問題は、一度に非常に多くのサブミッターとポーラーが存在し、処理キューがサブミッターにどうしようもなく遅れる場合です。これは、ポンプの単純化された実装です。match メソッドは #getFriendIds をイテレータし、キー付きルックアップを実行して、その ID の人が redis (または何でも) キャッシュに存在するかどうかを確認します。それらがキャッシュにある場合、それらは一致します。お互いのIDを交換して一致させます。

class HoldPump extends Thread {

    private final BlockingQueue<Incoming> queue = new ArrayBlockingQueue<>( CAPACITY );

    HoldPump() {
        super( "MatchingPump" );
    }

    public void submit( Person p ) {
        Incoming incoming = new Incoming( p.getId(), p.getFriendIds() ) );
        queueProcessing( incoming );
    }

    public void queueProcessing( Incoming incoming ) ... {
        queue.put( incoming );
    }

    @Override
    public void run() {
        try {
            while ( true ) {
                Incoming incoming = queue.take();
                tryMatch( incoming );
            }
        } catch ( InterruptedException e ) {
            Thread.interrupted();
        }
    }
}



protected void trytMatch( Incoming incoming ) {
    MatchStatus status = incoming.status;

    status.answer( incoming.holdDuration );

    for ( Integer candidate : incoming.candidates ) {
        MatchStatus waiting = waitingForMatchByPersonId.get( candidate );
        if ( waiting != null ) {
            waiting.setMatch( incoming.status.getPersonId() );
            status.setMatch( waiting.getPersonId() )
        }
    }
}

#setMatch メソッドは基本的に、MatchStatus の再入可能ロックの一部である完了状態を通知します。

于 2013-05-16T18:42:41.413 に答える