2

私は、分散アトミック ロング カウンターを継続的に監視するプロセスに取り組んできました。ZkClient次のクラスのメソッドを使用して毎分監視しますgetCounter。実際、私は複数のスレッドを実行しており、それぞれが Zookeeper ノードに格納された異なるカウンター (分散アトミック ロング) を監視しています。getCounter各スレッドは、メソッドのパラメーターを介してカウンターのパスを指定します。

public class TagserterZookeeperManager {

public enum ZkClient {
    COUNTER("10.11.18.25:2181");  // Integration URL

    private CuratorFramework client;
    private ZkClient(String servers) {
        Properties props = TagserterConfigs.ZOOKEEPER.getProperties();
        String zkFromConfig = props.getProperty("servers", "");
        if (zkFromConfig != null && !zkFromConfig.isEmpty()) {
            servers = zkFromConfig.trim();
        }
        ExponentialBackoffRetry exponentialBackoffRetry = new ExponentialBackoffRetry(1000, 3);
        client = CuratorFrameworkFactory.newClient(servers, exponentialBackoffRetry);
        client.start();
    }

    public CuratorFramework getClient() {
        return client;
    }
}

public static String buildPath(String ... node) {
    StringBuilder sb = new StringBuilder();
    for (int i = 0; i < node.length; i++) {
        if (node[i] != null && !node[i].isEmpty()) {
            sb.append("/");
            sb.append(node[i]);
        }
    }
    return sb.toString();
}

public static DistributedAtomicLong getCounter(String taskType, int hid, String jobId, String countType) {
    String path = buildPath(taskType, hid+"", jobId, countType);
    Builder builder = PromotedToLock.builder().lockPath(path + "/lock").retryPolicy(new ExponentialBackoffRetry(10, 10));
    DistributedAtomicLong count = new DistributedAtomicLong(ZkClient.COUNTER.getClient(), path, new RetryNTimes(5, 20), builder.build());
    return count;
}

}

スレッド内から、これは私がこのメソッドを呼び出す方法です:

    DistributedAtomicLong counterTotal = TagserterZookeeperManager
                        .getCounter("testTopic", hid, jobId, "test");

スレッドが数時間実行された後、ある段階で、カウントを読み取ろうとするメソッドorg.apache.zookeeper.KeeperException$ConnectionLossException内で次の例外が発生し始めたようです。getCounter

org.apache.zookeeper.KeeperException$ConnectionLossException: org.apache.zookeeper.KeeperException.create(KeeperException.java:99) の /contentTaskProd の KeeperErrorCode = ConnectionLoss org.apache.zookeeper.KeeperException.create(KeeperException.java:51) org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1045) で org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1073) で org.apache.curator.utils.ZKPaths.mkdirs(ZKPaths.java) :215) org.apache.curator.utils.EnsurePath$InitialHelper$1.call(EnsurePath.java:148) で org.apache.curator.RetryLoop.callWithRetry(RetryLoop.java:107) で org.apache.curator.utils .EnsurePath$InitialHelper.ensure(EnsurePath.java:141) at org.apache.curator.utils.EnsurePath.ensure(EnsurePath.java:99) at org.apache.curator.framework.recipes.atomic.DistributedAtomicValue.getCurrentValue(DistributedAtomicValue.java:254) org.apache.curator.framework.recipes.atomic.DistributedAtomicValue.get(DistributedAtomicValue.java:91) org.apache.curator.framework.レシピ.atomic.DistributedAtomicLong.get(DistributedAtomicLong.java:72) ...

しばらくの間、この例外が発生し続け、最終的に OutOfMemory エラーを引き起こし、プロセス全体が救済される内部メモリ リークを引き起こしているように感じます。これの理由が何であるか誰にも分かりますか?Zookeeper が接続損失の例外を突然スローし始めるのはなぜですか? プロセスが解決した後、私が作成した別の小さなコンソール プログラムを介して手動で Zookeeper に接続できます (これもキュレーターを使用)。

4

1 に答える 1

2

Zookeeper でノードを監視するには、 NodeCachecuratorを使用できますが、これでは接続の問題は解決されません.... しかし、1 分に 1 回ノードをポーリングする代わりに、ノードが変更されたときにプッシュ イベントを取得できます。

私の経験では、NodeCache接続の切断と再開を非常にうまく処理します。

于 2016-01-11T20:30:52.110 に答える