// multiple server instances (replicas), coordinated using MsgCoordinationService
public class Server {
ConcurrentHashMap<TxID,Future<Msg>> local_registry = new ...
MsgCoordinationService coordination_service = new ..
...
// Socket instance to communicate with a client...
public void accept(Socket s) {
new Thread(new Worker(s)).start();
}
// propose msg to coordination service, register demand to respond to client in local registry
public Future<Msg> register(Msg m) {
FutureMsg f = new MsgFuture(); // Future handle w. reference to an empty Msg object [= response]
TxID uniqueID = coordination_service.propose(s); // transaction ID
local_registry.add(uniqueID, f);
return f;
}
// called by coordination service, guaranteeing a global order on msg deliveries
public synchronized void deliver(TxID id, Msg m) {
... process Msg object [request]
... if local_registry.contains(id), 'compile' response
(using the Msg object from FutureMsg f, f.get() - f.isDone() when a certain Msg flag has been set)
___ now:
... notify waiting 'Worker' threads to check whether their 'Future' object isDone()
}
private class Worker implements Runnable {
...
public void run() {
...
Future<Msg> f = Server.this.register(request); // obtained through Socket s
while(!f.isDone())
wait();
response = f.get();
...
}
}
}
複製されたサービスを実装しています [複数のサーバー、クライアントは w. 単一のサーバー インスタンスの場合、作成/更新/削除操作は、メッセージ配信のグローバルな順序を保証する調整サービスによって分散されます]。
クライアントがサーバー インスタンスへの新しい接続を確立すると、すべての通信は専用Worker
インスタンス [読み取り要求をローカルで処理し、を使用して C/U/D 操作をブロードキャストするServer.this.register(...)
] を介してチャネルされます。
register
それ自体は基本的に、将来のローカル処理/応答のために要求を記録し、Msg
オブジェクトを調整サービスに転送します。
サービスは, ... をMsg
介してオブジェクトを再配信し、カプセル化されたタスクを処理した後、最初にクライアント要求を受信したインスタンスに、対応する応答を渡すように通知する必要があります。deliver
Worker
特定の理由で、私のデザインが壊れているようです... - w/o synchronized(this)
[in Worker#run()
], wait()
will not wait; を使用するとsynchronized(this)
、「ブロックされた」インスタンスのロックが解除さnotifyAll()
れません。Server#deliver(...)
Worker
簡単に言えば、結局のところ、私はあなたの助けが必要です... (a): の基本を理解するため、wait/notify/notifyAll
または (b): デザインを改善するため...または (c): (a)および(b ) )。