6

JGroups が特定のサーバーをコーディネーターとして使用するよう強制する方法を探しています。そのサーバーが存在しない場合は、指定されたサーバーがクラスターに再参加し、コーディネーターになるまで新しいコーディネーターを選出します。

この場合、更新のためにトピックをリッスンするコーディネーターによってクラスターにプッシュする情報がありますが、これらの更新のフェッチと処理はリソースを大量に消費する可能性があるため、外部に何かをサーバーに送信したくありません。そのため、クラスターの前のロードバランサーでは、コーディネーターに送信しないように設定しています。ただし、Coordinator はランダムに選出されるため、基本的には、1 台のマシンだけがそこにあるまでクラスターをシャットダウンしてから、残りのクラスターのバックアップを開始する必要があります。

4

3 に答える 3

5

現在、これを行う方法はありません。Jgroups は、コーディネーターがグループ内の任意のノードになることができることを確認するためにかなりの時間を費やしました。グループ メンバーシップ リストの正常性を維持および監視するすべてのタスクは、グループ内のすべてのメンバー間で共有され、コーディネーターの職務がコーディネーターのパフォーマンスに大きく影響しないようにします。標準の GMS (Group MembershipService) プロトコル スタック クラスは、コーディネーターの選択を担当します。現在、これはビュー リストの最初のホストにすぎません。

この動作を実現するには、独自のプロトコル スタックを実装する必要があります。興味深いことに、私は Jgroups のプロトコル スタックに取り組んでいます。これは、あなたが求めているものをほぼ実装していますが、プライム タイムの準備ができていません。

ただし、他の人はこの問題に頭を悩ませているかもしれません。jgroups メーリング リストに投稿して、同じ質問をすることをお勧めします。

于 2012-06-05T22:46:32.077 に答える
1

任意のノードをコーディネーターに設定できます。: github サンプル

そして、すべてのノードで完了した変更と完全なコードの同期ブロックを追加しています。

public static final String GMS_DELTA_VIEW_FIELD_NAME = "use_delta_views";

/**
 * Change coordinator to {@code desiredCoordinator}. Must be invoked from coordinator.
 * @param desiredCoordinator
 * @return {@code true} if changes success, {@code false} overwise 
 */
boolean changeCoordinator(JChannel currentChannel, Address desiredCoordinator) {

    if(!Util.isCoordinator(currentChannel.getAddress)) {
        throw new RuntimeException("The current node is not coordinator.");
    }

    ArrayList<Address> newMembersOrder = Lists.newArrayList(currentView.getMembers());        

    // Switch desired node to first place
    Collections.swap(newMembersOrder, 0, newMembersOrder.indexOf(desiredCoordinator));        

    // Create new view
    long newId = currentView.getViewId().getId() + 1;
    View newView = new View(newMembersOrder.get(0), newId, newMembersOrder);

    GMS gms = (GMS)clusterChannel.getProtocolStack().findProtocol(GMS.class);
    CustomProtocol protocol = new CustomProtocol(newMembersOrder.stream()
            .filter(item -> !item.equals(currentChannel.getAddress()))
            .collect(Collectors.toSet()));

    boolean oldUseDeltaViews = (Boolean)gms.getValue(GMS_DELTA_VIEW_FIELD_NAME);
    try {
        // Disable using_delta_views at GMS
        gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, false);

        // Insert custom protocol below GMS for synchronizing with VIEW_ACK events
        currentChannel.getProtocolStack().insertProtocolInStack(protocol, gms, ProtocolStack.BELOW);
        gms.castViewChange(newView, null, newMembersOrder);

        // Wait no more than 30 seconds to all VIEW_ACK responses
        if (!protocol.collector.waitForAllAcks(TimeUnit.SECONDS.toMillis(30))) {                
            return false;
        }

        return true;
    }
    finally {
        // Repair old state
        gms.setValue(GMS_DELTA_VIEW_FIELD_NAME, oldUseDeltaViews);
        currentChannel.getProtocolStack().removeProtocol(protocol);
    }
}

private class CustomProtocol extends Protocol implements UpHandler {

    AckCollector collector;

    public CustomProtocol(Collection<Address> waitedAddresses) {
        collector = new AckCollector(waitedAddresses);
    }

    @Override
    public Object up(Event evt) {

        if(evt.getType() == Event.MSG) {
            final Message msg=(Message)evt.getArg();
            GmsHeader hdr=(GmsHeader)msg.getHeader(proto_id);
            if(hdr != null && hdr.getType() == GmsHeader.VIEW_ACK) {                    
                collector.ack(msg.getSrc());
            }
        }

        return super.up(evt);
    }
}
于 2015-11-27T16:11:28.517 に答える