// multiple server instances (replicas), coordinated using MsgCoordinationService
public class Server {
ConcurrentHashMap<TxID,Future<Msg>> local_registry = new ...
MsgCoordinationService coordination_service = new ..
...
// Socket instance to communicate with a client...
public void accept(Socket s) {
new Thread(new Worker(s)).start();
}
// propose msg to coordination service, register demand to respond to client in local registry
public Future<Msg> register(Msg m) {
FutureMsg f = new MsgFuture(); // Future handle w. reference to an empty Msg object [= response]
TxID uniqueID = coordination_service.propose(s); // transaction ID
local_registry.add(uniqueID, f);
return f;
}
// called by coordination service, guaranteeing a global order on msg deliveries
public synchronized void deliver(TxID id, Msg m) {
... process Msg object [request]
... if local_registry.contains(id), 'compile' response
(using the Msg object from FutureMsg f, f.get() - f.isDone() when a certain Msg flag has been set)
___ now:
... notify waiting 'Worker' threads to check whether their 'Future' object isDone()
}
private class Worker implements Runnable {
...
public void run() {
...
Future<Msg> f = Server.this.register(request); // obtained through Socket s
while(!f.isDone())
wait();
response = f.get();
...
}
}
}
複製されたサービスを実装しています [複数のサーバー、クライアントは w. 単一のサーバー インスタンスの場合、作成/更新/削除操作は、メッセージ配信のグローバルな順序を保証する調整サービスによって分散されます]。
クライアントがサーバー インスタンスへの新しい接続を確立すると、すべての通信は専用Workerインスタンス [読み取り要求をローカルで処理し、を使用して C/U/D 操作をブロードキャストするServer.this.register(...)] を介してチャネルされます。
registerそれ自体は基本的に、将来のローカル処理/応答のために要求を記録し、Msgオブジェクトを調整サービスに転送します。
サービスは, ... をMsg介してオブジェクトを再配信し、カプセル化されたタスクを処理した後、最初にクライアント要求を受信したインスタンスに、対応する応答を渡すように通知する必要があります。deliverWorker
特定の理由で、私のデザインが壊れているようです... - w/o synchronized(this)[in Worker#run()], wait()will not wait; を使用するとsynchronized(this)、「ブロックされた」インスタンスのロックが解除さnotifyAll()れません。Server#deliver(...)Worker
簡単に言えば、結局のところ、私はあなたの助けが必要です... (a): の基本を理解するため、wait/notify/notifyAllまたは (b): デザインを改善するため...または (c): (a)および(b ) )。