0

現在、cassandra を使用して POC に取り組んでいます。

私がやりたいこと: 可変数のセンサーがあり (前もって知られていません)、各センサーは 1 秒に数回何らかの値を提供します。私がやりたいことは、毎秒、分、時間などの平均、最小、最大、速度を計算することです.

データをモデル化する方法: 複数の列ファミリーがあるため。raw、avg-5-second、avg-60-second など。rowid は、machinex:memory などのセンサー ID です。列名はタイムスタンプで、列の値は測定値です。

私がこれまでに持っていること: 単一のセンサー (単一の行 ID) のデータを生成するシステムを作成しました。そして、その特定の行 ID のデータのスライスを取得し、結果を集計された列ファミリに格納するいくつかのタスクがあります。

例:

Cluster cluster = HFactory.getOrCreateCluster("test-cluster", "localhost:9160"); キースペース keyspace = createKeyspace(cluster, "Measurements");

String machine1 = "foo:dev:192.168.1.1:5701";
String rowId = machine1 + ":operationCount";

DatapointRepository rawRepo = new DatapointRepository(cluster, keyspace, "Measurements");
DatapointRepository avgSecondRepo = new DatapointRepository(cluster, keyspace, "averageSecond");
DatapointRepository avgFiveSecondRepo = new DatapointRepository(cluster, keyspace, "averageFiveSeconds");
DatapointRepository maxFiveSecondRepo = new DatapointRepository(cluster, keyspace, "maxFiveSeconds");

ScheduledExecutorService scheduler = new ScheduledThreadPoolExecutor(10);
scheduler.scheduleAtFixedRate(
        new RollupRunnable(
        rawRepo,
        avgSecondRepo,
        rowId,
        "average 1 second",
                new AggregateFunctionFactory(AverageFunction.class)),
        0, 1, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
        new RollupRunnable(
        avgSecondRepo,
        avgFiveSecondRepo,
        rowId,
        "average 5 seconds",
                new AggregateFunctionFactory(AverageFunction.class)),
        0, 5, TimeUnit.SECONDS);
scheduler.scheduleAtFixedRate(
        new RollupRunnable(
        avgSecondRepo,
        maxFiveSecondRepo,
        rowId,
        "maximum 5 seconds",
                new AggregateFunctionFactory(MaximumFunction.class)),
        0, 5, TimeUnit.SECONDS);


long startTime = System.currentTimeMillis();

new GenerateMeasurementsThread(rawRepo, machine1).start();

Thread.sleep(30000);

long endTime = System.currentTimeMillis();

System.out.println("average seconds:");
print(avgSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("average 5 seconds:");
print(avgFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");
System.out.println("max 5 seconds:");
print(maxFiveSecondRepo, startTime, endTime, machine1 + ":operationCount");


System.out.println("finished");
System.exit(0);

したがって、センサーが 1 つ (つまり行 ID が 1 つ) の場合、またはセンサーがどれかを事前に知っていれば、すべて正常に動作します。問題は、可変数のセンサーがあり、新しいセンサーがいつでも表示され、古いセンサーがデータの送信を停止する可能性があることです。

私の大きな疑問は、特定の期間内にどのセンサーが利用可能かをどのように把握できるかということです。それがわかったら、センサーごとに集計タスクを作成できます。

4

2 に答える 2

0

@userxxxx

「私はあなたの提案を実装し、1 つのバグを除いて動作します。同じ「時間」の複数のセンサー データポイントがある場合、最後に保存されたデータポイントの名前のみが表示されます。」

簡単な修正:

rowId = xxx // whatever value, doest not really matter

column name = composite of(timestamp,sensorId)

column value = nothing

列名をタイムスタンプとセンサー ID の合成として設定することで、まったく同時に複数のセンサーが存在する場合に対応できます。

センサー ID 情報は列に直接格納されるため、列の値は不要になります。これは、値のない列ファミリーと呼ばれます

このようなテーブルを作成する CQL スクリプト

CREATE TABLE sensor_index_by_date
(

   row_id text, // whatever
   date timestamp,
   sensor_id bigint,
   PRIMARY KEY(rowId,date,sensor_id)
);
于 2013-09-07T16:32:44.650 に答える