0

私は Kefir (または Bacon.js、お気に入りを選んでください) を使用してリアルタイム データを含む個人的なプロジェクトに取り組んでおり、データをデータベースに記録して ID を追加し、オブジェクトを渡す必要があるところまで来ました。チェーンの id を使用します。実際にデータベース (NeDB) にデータを挿入することは問題ではなく、レコードがデータベースに挿入されている間、コールバックを使用して実行を継続し、この動作を回避する方法です。

非常に単純化された例:

解析されたデータをバス/プールにダンプするデバイスがいくつかあるとします。

function Position(data) {
    this.id = null;
    this.longitude = data.longitude;
    this.latitude = data.latitude;
}

self.positionDataPool.map(function(position)) {  // is this even what really needs to be done?
    // unsure what to do here {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            if(e) { ... }
            , else {
                position.id = newRecord._id;
                return position;
            }
        }
    //}
})
.filter(function(position) {
    // the position without an id is passed here
    ...
});

これはマップ機能の不適切または不適切な使用であると思われますが、いくつかのことを試した後、アイデアが出てきました。ご意見、ご提案、またはヘルプをいただければ幸いです。

私の解決策

(すでに行ったことに加えて)さらに多くの読書と実験を行い、日常的にストリーム処理を行っていた日々に戻った後、次の解決策を思いつきました。これは最も効率的ではないかもしれませんが、このソリューションでは、複数のイベント ソースをデータ プール (図示せず) にプラグインすることで、複数のソースから入力を受け取ります。オブジェクト/データに対して単一の操作を実行するために、まったく新しいストリームが作成されます。ここでは拡張性が目的ではありませんでしたが、これにより、複数のソースがストリームからデータをフィルターに直接ダンプするのではなく、監視できるようになります。最後に、処理されたストリームからのデータがフィルタリングされ、必要な結果のみが表示されます。

self.savedPositionDataStream = Kefir.stream(function(emitter) {
    self.positionDataPool.onValue(function(val) {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            val.id = newRecord._id;
            emitter.emit(val);
        }
    });
});

self.filteredPositionData = savedPositionDataStream.filter(...);
4

1 に答える 1

2

少なくとも Bacon.js ではBacon.fromNodeCallback、挿入呼び出しの結果をストリームにラップするために使用できます。お気に入り

Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted)

もちろん、これはBacon.fromBinderまたは同様の方法で行うことができますKefir.streamが、fromNodeCallbackヘルパーは成功/エラー値を自動的に処理し、それらをストリーム イベントに適切に変換するため、これをより簡単にします。

そしてflatMapの代わりにmap、挿入を行い、結果をストリームとして提供します:

let insertionResultE = self.positionDataPool.flatMap(val => 
  Bacon.fromNodeCallback(self.db, "insert", val).map("._id")
)
insertionResultE.log("insertion result")

同様のアプローチがケフィアにも適用されます。重要なのは、 では非同期で失敗する可能性のある計算をmap実行できないということですが、 では実行できますflapMap

insertionResultEここでの注意点は、アクティブ化するには少なくとも 1 つのサブスクライバーを追加する必要があることです。上記の例でlogは、それを行います。

于 2015-10-28T07:17:40.037 に答える