私はオブジェクトでsourceStream
構成されています。BaseData
このストリームをn
-amount の異なるストリームにフォークし、各BaseData
オブジェクトを好みに合わせてフィルタリングおよび変換したいと考えています。
最終的にn
は、特定のタイプのみを含むストリームが必要であり、将来データが削除または追加される可能性があるため、フォークされたストリームの長さはさまざまです。
私はこれを次の方法で設定できると思いましたfork
:
import * as _ from 'highland';
interface BaseData {
id: string;
data: string;
}
const sourceStream = _([
{id: 'foo', data: 'poit'},
{id: 'foo', data: 'fnord'},
{id: 'bar', data: 'narf'}]);
const partners = [
'foo',
'bar',
];
partners.forEach((partner: string) => {
const partnerStream = sourceStream.fork();
partnerStream.filter((baseData: BaseData) => {
return baseData.id === partner;
});
partnerStream.each(console.log);
});
foo
2 つのストリームがあり、 -stream には 2 つの要素が含まれていると予想しました。
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
-stream には 1 つの要素が含まれます。
{ id: 'bar', data: 'narf' }
それでも、代わりにエラーが発生します。
/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338
throw new Error(
^
Error: Stream already being consumed, you must either fork() or observe()
at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15)
at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10)
at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18)
at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19)
at Array.forEach (native)
at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10)
at Module._compile (module.js:570:32)
at Object.Module._extensions..js (module.js:579:10)
at Module.load (module.js:487:32)
at tryModuleLoad (module.js:446:12)
ストリームを複数のストリームにフォークする方法は?
また、呼び出しをチェーンしようとしましたが、1 つのストリームの結果しか返されません。
partners.forEach((partner: string) => {
console.log(partner);
const partnerStream = sourceStream
.fork()
.filter((item: BaseData) => {
return item.id === partner;
});
partnerStream.each((item: BaseData) => {
console.log(item);
});
});
印刷のみ:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
期待される代わりに:
foo
{ id: 'foo', data: 'poit' }
{ id: 'foo', data: 'fnord' }
bar
{id: 'bar', data: 'narf'}
私が誤解していたのも、すべてだったのかもしれませんfork
。そのドキュメントエントリに従って:
Stream.fork() ストリームをフォークして、バックプレッシャーを共有するコンシューマーを追加できるようにします。複数のコンシューマーにフォークされたストリームは、最も遅いコンシューマーが処理できる速度でのみソースから値を取得します。
注: フォーク間の一貫した実行順序に依存しないでください。この変換は、すべてのフォークが値 foo を処理してから、いずれかが 2 番目の値 bar を処理することを保証するだけです。フォークが foo を処理する順序は保証されません。
ヒント: フォーク内でストリーム値を変更する (またはそうするライブラリを使用する) 場合は注意してください。すべてのフォークに同じ値が渡されるため、1 つのフォークで行われた変更は、その後に実行されるすべてのフォークで表示されます。これに矛盾した実行順序が加わると、微妙なデータ破損のバグが発生する可能性があります。値を変更する必要がある場合は、コピーを作成し、代わりにコピーを変更する必要があります。
非推奨の警告: 現在、ストリームを消費した後に (たとえば、変換を介して) ストリームをフォークすることが可能です。これは、次のメジャー リリースでは不可能になります。ストリームを fork する場合は、常に fork を呼び出してください。
「ストリームをフォークする方法」の代わりに 私の実際の質問は次のとおりです。ハイランドストリームをその場で別のストリームに複製する方法は?