次の Dart コード スニペットがあるとします。
Stream stream1 = new Stream.periodic(new Duration(seconds: 1), (n) => n)
.take(10)
.asBroadcastStream();
stream1.listen((n) => print("stream1 : $n"),
onError : (err) => print("stream1 : $err"),
onDone : () => print("stream1 : done"),
cancelOnError : false);
Stream stream2 = stream1.where((n) => n % 2 == 0).take(2);
stream2.listen((n) => print("stream2 : $n"),
onError : (err) => print("stream2 : $err"),
onDone : () => print("stream2 : done"),
cancelOnError : false);
Stream stream3 = stream1.where((n) => n % 2 != 0).take(2);
stream3.listen((n) => print("stream3 : $n"),
onError : (err) => print("stream3 : $err"),
onDone : () => print("stream3 : done"),
cancelOnError : false);
StreamController controller = new StreamController.broadcast();
controller.addStream(stream2)
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
次の出力が得られます。
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream1 : 3
stream1 : 4
stream1 : 5
stream1 : 6
stream1 : 7
stream1 : 8
stream1 : 9
stream1 : done
stream2 : done
stream3 : done
この出力から理解できないことがいくつかあります。
stream2
それぞれのエントリが 1 つしかないのはなぜstream3
ですか。で作成した複合ストリームは、およびStreamController
からのイベントの 1 つを消費しますか? この動作は私には奇妙に思えます。何か不足していますか?stream2
stream3
stream2
完了したときにstream3
のみ完了するのはなぜstream1
ですか?.take(10)
これは、両方が制限されている場合に期待する自然な動作ではなく、 onの動作と矛盾しstream1
ます。.take(10)
onstream1
を削除するstream2
とstream3
、実際には完了しません。
controller
ソースも追加するように変更するとstream1
(以下のスニペットと出力を参照)、実際には 2 つの要素がアップしたときに自然な位置でstream2
andstream3
が完了しますが、ストリームの 1 つをリッスンしようとしたため、例外も発生します。二回。
StreamController controller = new StreamController.broadcast();
controller.addStream(stream1)
.then((_) => controller.addStream(stream2))
.then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
onError : (err) => print("composite stream : $err"),
onDone : () => print("composite stream : done"),
cancelOnError : false);
stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream2 : 2
stream2 : done
composite stream : 2
stream1 : 3
stream3 : 3
stream3 : done
composite stream : 3
stream1 : 4
composite stream : 4
stream1 : 5
composite stream : 5
stream1 : 6
composite stream : 6
stream1 : 7
composite stream : 7
stream1 : 8
composite stream : 8
stream1 : 9
stream1 : done
composite stream : 9
Uncaught Error: Bad state: Stream has already been listened to.
Stack Trace:
#0 _StreamController._subscribe (dart:async/stream_controller.dart:151:7)
#1 _ControllerStream._createSubscription (dart:async/stream_controller.dart:259:157)
#2 _StreamImpl.listen (dart:async/stream_impl.dart:260:58)
#3 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#4 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#5 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#6 _AsBroadcastStream.listen (dart:async/stream_impl.dart:466:37)
#7 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#8 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#9 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#10 _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#11 _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#12 _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#13 _AddStreamState._AddStreamState (dart:async/stream_controller.dart:300:133)
#14 _BroadcastStreamController.addStream (dart:async/broadcast_stream_controller.dart:140:27)
#15 main.<anonymous closure> (file:///C:/SourceCode/personal/SteamTest/lib/streamtest.dart:38:38)
#16 _ThenFuture._zonedSendValue (dart:async/future_impl.dart:371:24)
#17 _TransformFuture._sendValue.<anonymous closure> (dart:async/future_impl.dart:348:48)
#18 _ZoneBase._runInZone (dart:async/zone.dart:82:17)
#19 _ZoneBase._runUnguarded (dart:async/zone.dart:102:22)
#20 _ZoneBase.executeCallback (dart:async/zone.dart:58:23)
#21 _TransformFuture._sendValue (dart:async/future_impl.dart:348:26)
#22 _FutureImpl._setValueUnchecked (dart:async/future_impl.dart:184:26)
#23 _FutureImpl._asyncSetValue.<anonymous closure> (dart:async/future_impl.dart:218:25)
#24 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#25 _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#26 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#27 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#28 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
Unhandled exception:
Bad state: Stream has already been listened to.
#0 _DefaultZone.handleUncaughtError.<anonymous closure> (dart:async/zone.dart:146:7)
#1 _asyncRunCallback (dart:async/event_loop.dart:9:15)
#2 _asyncRunCallback (dart:async/event_loop.dart:13:7)
#3 _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#4 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#5 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#6 _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#7 _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#8 _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)
誰かがここで何が起こっているのかを理解するのを手伝ってくれますか?
ありがとう、