私は3つのストリームを持っています:
TStream<Double> tempReadings=topology.poll(tempSensor, 10, TimeUnit.SECONDS);
TStream<Double> co2Readings=topology.poll(co2Sensor, 10, TimeUnit.SECONDS);
TStream<Boolean> stationaryReadings=topology.poll(stationarySensor, 10, TimeUnit.SECONDS);
現在、3 つの JSON オブジェクトから 3 つの個別のデバイス イベントを作成しています。
TStream<JsonObject> tempJson=tempReadings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("Temperature", tuple);
return json;
});
TStream<JsonObject> co2Json=co2Readings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("C02Level", tuple);
return json;
});
TStream<JsonObject> sensoryJson=stationaryReadings.map(tuple->{
JsonObject json=new JsonObject();
json.addProperty("isStationary", tuple);
return json;
});
代わりに、これらのストリームを結合し、3 つのプロパティ (Temperature、C02Level、isStationary) を持つ 1 つの JSON オブジェクトを作成して、1 つのイベントを作成したいと考えています。