1

私は3つのストリームを持っています:

TStream<Double> tempReadings=topology.poll(tempSensor, 10, TimeUnit.SECONDS);
TStream<Double> co2Readings=topology.poll(co2Sensor, 10, TimeUnit.SECONDS);
TStream<Boolean> stationaryReadings=topology.poll(stationarySensor, 10, TimeUnit.SECONDS);

現在、3 つの JSON オブジェクトから 3 つの個別のデバイス イベントを作成しています。

TStream<JsonObject> tempJson=tempReadings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("Temperature", tuple);
    return json;
});
TStream<JsonObject> co2Json=co2Readings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("C02Level", tuple);
    return json;
});
TStream<JsonObject> sensoryJson=stationaryReadings.map(tuple->{
    JsonObject json=new JsonObject();
    json.addProperty("isStationary", tuple);
    return json;
});

代わりに、これらのストリームを結合し、3 つのプロパティ (Temperature、C02Level、isStationary) を持つ 1 つの JSON オブジェクトを作成して、1 つのイベントを作成したいと考えています。

4

2 に答える 2

1

ストリームを結合することはできますが、それはタプルを次々と配置するだけであり、同じタイプのストリームを使用する必要があります。

3 つのプロパティすべてを一度に読み取りたい場合は、"readings" オブジェクトを返すセンサーを作成できます。

class Reading {
    Double temperature;
    Double c02Level;
    Boolean isStationary;
}
于 2016-12-25T18:06:05.303 に答える
1

この場合、「単一のポーリングを組み合わせた読み取りタプル」アプローチがおそらく最適です。

より一般的には、PlumbingStreams.barrier()を使用して、複数のストリームの対応するタプルをマージできます。何かのようなもの:

TStream<JsonObject> combinedReadings =
    PlumbingStreams.barrier(Arrays.asList(tempJson,co2Json,sensoryJson))
    .map(list -> combineTuples(list));


static JsonObject combineTuples(JsonObject list...) {
  JsonObject jo = new JsonObject();
  for (JsonObject j : list) {
    for (Entry<String,JsonElement> e : j.entrySet()) {
      jo.addProperty(e.getKey(), e.getValue());
    }
  }
  return jo;
}
于 2017-02-02T15:50:57.897 に答える