現在、cassandra を使用して POC に取り組んでいます。
私がやりたいこと: 可変数のセンサーがあり (前もって知られていません)、各センサーは 1 秒に数回何らかの値を提供します。私がやりたいことは、毎秒、分、時間などの平均、最小、最大、速度を計算することです.
データをモデル化する方法: 複数の列ファミリーがあるため。raw、avg-5-second、avg-60-second など。rowid は、machinex:memory などのセンサー ID です。列名はタイムスタンプで、列の値は測定値です。
私がこれまでに持っていること: 単一のセンサー (単一の行 ID) のデータを生成するシステムを作成しました。そして、その特定の行 ID のデータのスライスを取得し、結果を集計された列ファミリに格納するいくつかのタスクがあります。
例:
Cluster cluster = HFactory.getOrCreateCluster("test-cluster", "localhost:9160"); キースペース keyspace = createKeyspace(cluster, "Measurements");
String machine1 = "foo:dev:192.168.1.1:5701";
String rowId = machine1 + ":operationCount";
DatapointRepository rawRepo = new DatapointRepository(cluster, keyspace, "Measurements");
DatapointRepository avgSecondRepo = new DatapointRepository(cluster, keyspace, "averageSecond");
DatapointRepository avgFiveSecondRepo = new DatapointRepository(cluster, keyspace, "averageFiveSeconds");
DatapointRepository maxFiveSecondRepo = new DatapointRepository(cluster, keyspace, "maxFiveSeconds");
ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);
scheduler.scheduleAtFixedRate(
new RollupRunnable(
rawRepo,
avgSecondRepo,
rowId,
"average 1 second",
new AggregateFunctionFactory(AverageFunction.class)),
0, 1, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
new RollupRunnable(
avgSecondRepo,
avgFiveSecondRepo,
rowId,
"average 5 seconds",
new AggregateFunctionFactory(AverageFunction.class)),
0, 5, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
new RollupRunnable(
avgSecondRepo,
maxFiveSecondRepo,
rowId,
"maximum 5 seconds",
new AggregateFunctionFactory(MaximumFunction.class)),
0, 5, TimeUnit.SECONDS);
long startTime = System.currentTimeMillis();
new GenerateMeasurementsThread(rawRepo, machine1).start();
Thread.sleep(30000);
long endTime = System.currentTimeMillis();
System.out.println("average seconds:");
print(avgSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("average 5 seconds:");
print(avgFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("max 5 seconds:");
print(maxFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("finished");
System.exit(0);
したがって、センサーが 1 つ (つまり行 ID が 1 つ) の場合、またはセンサーがどれかを事前に知っていれば、すべて正常に動作します。問題は、可変数のセンサーがあり、新しいセンサーがいつでも表示され、古いセンサーがデータの送信を停止する可能性があることです。
私の大きな疑問は、特定の期間内にどのセンサーが利用可能かをどのように把握できるかということです。それがわかったら、センサーごとに集計タスクを作成できます。