0
// multiple server instances (replicas), coordinated using MsgCoordinationService
public class Server {

    ConcurrentHashMap<TxID,Future<Msg>> local_registry = new ...
    MsgCoordinationService coordination_service = new .. 

    ...

    // Socket instance to communicate with a client...
    public void accept(Socket s) {
        new Thread(new Worker(s)).start();
    }

    // propose msg to coordination service, register demand to respond to client in local registry
    public Future<Msg> register(Msg m) {
        FutureMsg f = new MsgFuture(); // Future handle w. reference to an empty Msg object [= response]
        TxID uniqueID = coordination_service.propose(s); // transaction ID
        local_registry.add(uniqueID, f);
        return f;
    }

    // called by coordination service, guaranteeing a global order on msg deliveries
    public synchronized void deliver(TxID id, Msg m) {
        ... process Msg object [request]
        ... if local_registry.contains(id), 'compile' response
            (using the Msg object from FutureMsg f, f.get() - f.isDone() when a certain Msg flag has been set)

        ___ now:
        ... notify waiting 'Worker' threads to check whether their 'Future' object isDone()
    }

    private class Worker implements Runnable {
        ...
        public void run() {
            ...
            Future<Msg> f = Server.this.register(request); // obtained through Socket s

            while(!f.isDone())
                wait();

            response = f.get();
            ...
        }
    }
}

複製されたサービスを実装しています [複数のサーバー、クライアントは w. 単一のサーバー インスタンスの場合、作成/更新/削除操作は、メッセージ配信のグローバルな順序を保証する調整サービスによって分散されます]。

クライアントがサーバー インスタンスへの新しい接続を確立すると、すべての通信は専用Workerインスタンス [読み取り要求をローカルで処理し、を使用して C/U/D 操作をブロードキャストするServer.this.register(...)] を介してチャネルされます。

registerそれ自体は基本的に、将来のローカル処理/応答のために要求を記​​録し、Msgオブジェクトを調整サービスに転送します。

サービスは, ... をMsg介してオブジェクトを再配信し、カプセル化されたタスクを処理した後、最初にクライアント要求を受信したインスタンスに、対応する応答を渡すように通知する必要があります。deliverWorker

特定の理由で、私のデザインが壊れているようです... - w/o synchronized(this)[in Worker#run()], wait()will not wait; を使用するとsynchronized(this)、「ブロックされた」インスタンスのロックが解除さnotifyAll()れません。Server#deliver(...)Worker

簡単に言えば、結局のところ、私はあなたの助けが必要です... (a): の基本を理解するため、wait/notify/notifyAllまたは (b): デザインを改善するため...または (c): (a)および(b ) )。

4

1 に答える 1

2

スレッド呼び出しwait/notifyには、これらのメソッドが呼び出されるオブジェクトのロックが必要です。そうしないと、例外が発生します。一般的な形式で、任意のオブジェクトを想定すると、次のようになります。

final Object obj = new Object();
...

synchronized(obj) {
    while(/* condition */) {
       obj.wait();
    }
}

waityourが によって解放されない理由は、呼び出しているオブジェクトとは異なるオブジェクトnotifyの内部で実行しているためです。したがって、それらはペアになっていません。両方の呼び出しに同じインスタンスを使用する必要があります。のインスタンス内で実行し、 のインスタンス内で実行しているため、同じオブジェクトを参照していません。this.wait()notifythis.wait()Workerthis.notifyAll()Serverthis

クラスとスレッド間で見える単一の同期オブジェクトを作成し、そのオブジェクトで同期する必要があります。

于 2012-07-17T19:16:38.887 に答える