-2

これは私のコードです。分散システムで呼び出しを同期する方法を実装することになっています。これは、メッセージを送信して ack を待つ部分です。ネットワークとピアは信頼できると想定されています。

ただし、この例外が発生し続け、その理由が理解できないため、私の問題はローカル同期に関連しています。

Exception in thread "Thread-7" java.lang.IllegalMonitorStateException
at java.lang.Object.wait(Native Method)
at java.lang.Object.wait(Object.java:503)
at distributed.PeerManager.sendAllWithAck(PeerManager.java:212)
at distributed.TokenManager.onTokenReceived(TokenManager.java:67)
at communication.TokenMessage.execute(TokenMessage.java:21)
at distributed.ListenThread.run(ListenThread.java:38)
Exception in thread "Thread-8" java.lang.IllegalMonitorStateException
    at java.lang.Object.notify(Native Method)
    at distributed.AckWaiter.run(AckWaiter.java:74)

AckWaiter クラスは次のとおりです。

public class AckWaiter extends Thread {
    public int counter;
    PeerManager pm;
    Message m;
    public Object waiter;
    public int parentPort;

    public AckWaiter(PeerManager pm, Message m, int n,int parentPort) {
        counter = n;
        this.pm = pm;
        this.m = m;
        this.parentPort=parentPort;
        waiter=new Object();
    }

    public synchronized void notifyAck() {
        counter--;
            notify();

    }

    @Override
    public synchronized void run(){
        pm.sendAllExceptMe(m);

        try {
            BufferedReader inFromClient=new BufferedReader(new
                    InputStreamReader(pm.listener.socketMap.get(parentPort).getInputStream()));
            while(counter>0){
                    wait(100);
                    Message m=CustomMarshaller.getCustomMarshaller().unmarshal(inFromClient.readLine());
                    if(m==null){
                        continue;
                    }
                    m.execute(pm);
                }
                } catch (IOException | InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        } 


        waiter.notify();
        return;
    }
}

そして、これがスレッドをブロックするはずの呼び出しです

public synchronized void sendAllWithAck(Message m){
    if(aw!=null){
        throw new RuntimeException();
    }
    aw=new AckWaiter(this,m,connectionList.size()-1,m.sender.getPort());
    aw.start();
    try {
        aw.waiter.wait();
    } catch (InterruptedException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    aw=null;
}

「sendAllWithAck」は AckWaiter スレッドを開始し、「n」個の ACK を受信したときに通知されるオブジェクト「waiter」を待機します。

「notifyAck」メソッドは、ACK メッセージを受信したときに通信層によって呼び出されます。

「sendAllWithAck」は、ソケットからの読み取りを担当するスレッドによって呼び出される可能性があるため、AckWaiter は独自のスレッドです。n 個のピアがあり、すべてのピアには他のすべてのピア用のオープン ソケット (インバウンド メッセージを処理するスレッドを使用) があるため、受信したメッセージに応答して ACK を待つと、そのピアから ACK を読み取ることができません。ピア (parentPort はそのピアの識別子であるため、このスレッド内で定期的に ACK をチェックできます)。

この例外が構造上の問題によるものである場合は、アーキテクチャを変更してもかまいませんが、通信レイヤーを変更せずに別の方法で処理する方法がわかりません。それは面倒です。

4

1 に答える 1