4

次の Dart コード スニペットがあるとします。

Stream stream1 = new Stream.periodic(new Duration(seconds: 1), (n) => n)
                           .take(10)
                           .asBroadcastStream();
stream1.listen((n) => print("stream1 : $n"),
               onError : (err) => print("stream1 : $err"),
               onDone : () => print("stream1 : done"),
               cancelOnError : false);

Stream stream2 = stream1.where((n) => n % 2 == 0).take(2);
stream2.listen((n) => print("stream2 : $n"),
               onError : (err) => print("stream2 : $err"),
               onDone : () => print("stream2 : done"),
               cancelOnError : false);

Stream stream3 = stream1.where((n) => n % 2 != 0).take(2);
stream3.listen((n) => print("stream3 : $n"),
               onError : (err) => print("stream3 : $err"),
               onDone : () => print("stream3 : done"),
               cancelOnError : false);

StreamController controller = new StreamController.broadcast();
controller.addStream(stream2)
  .then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
                         onError : (err) => print("composite stream : $err"),
                         onDone : () => print("composite stream : done"),
                         cancelOnError : false);

次の出力が得られます。

stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream1 : 3
stream1 : 4
stream1 : 5
stream1 : 6
stream1 : 7
stream1 : 8
stream1 : 9
stream1 : done
stream2 : done
stream3 : done

この出力から理解できないことがいくつかあります。

  1. stream2それぞれのエントリが 1 つしかないのはなぜstream3ですか。で作成した複合ストリームは、およびStreamControllerからのイベントの 1 つを消費しますか? この動作は私には奇妙に思えます。何か不足していますか?stream2stream3

  2. stream2完了したときにstream3のみ完了するのはなぜstream1ですか? .take(10)これは、両方が制限されている場合に期待する自然な動作ではなく、 onの動作と矛盾しstream1ます。.take(10)onstream1を削除するstream2stream3、実際には完了しません。

controllerソースも追加するように変更するとstream1(以下のスニペットと出力を参照)、実際には 2 つの要素がアップしたときに自然な位置でstream2andstream3が完了しますが、ストリームの 1 つをリッスンしようとしたため、例外も発生します。二回。

StreamController controller = new StreamController.broadcast();
controller.addStream(stream1)
  .then((_) => controller.addStream(stream2))
  .then((_) => controller.addStream(stream3));
controller.stream.listen((n) => print("composite stream : $n"),
                         onError : (err) => print("composite stream : $err"),
                         onDone : () => print("composite stream : done"),
                         cancelOnError : false);

stream1 : 0
stream2 : 0
composite stream : 0
stream1 : 1
stream3 : 1
composite stream : 1
stream1 : 2
stream2 : 2
stream2 : done
composite stream : 2
stream1 : 3
stream3 : 3
stream3 : done
composite stream : 3
stream1 : 4
composite stream : 4
stream1 : 5
composite stream : 5
stream1 : 6
composite stream : 6
stream1 : 7
composite stream : 7
stream1 : 8
composite stream : 8
stream1 : 9
stream1 : done
composite stream : 9
Uncaught Error: Bad state: Stream has already been listened to.
Stack Trace:
#0      _StreamController._subscribe (dart:async/stream_controller.dart:151:7)
#1      _ControllerStream._createSubscription (dart:async/stream_controller.dart:259:157)
#2      _StreamImpl.listen (dart:async/stream_impl.dart:260:58)
#3      _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#4      _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#5      _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#6      _AsBroadcastStream.listen (dart:async/stream_impl.dart:466:37)
#7      _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#8      _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#9      _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#10     _ForwardingStreamSubscription._ForwardingStreamSubscription (dart:async/stream_pipe.dart:53:43)
#11     _ForwardingStream._createSubscription (dart:async/stream_pipe.dart:35:16)
#12     _ForwardingStream.listen (dart:async/stream_pipe.dart:32:31)
#13     _AddStreamState._AddStreamState (dart:async/stream_controller.dart:300:133)
#14     _BroadcastStreamController.addStream (dart:async/broadcast_stream_controller.dart:140:27)
#15     main.<anonymous closure> (file:///C:/SourceCode/personal/SteamTest/lib/streamtest.dart:38:38)
#16     _ThenFuture._zonedSendValue (dart:async/future_impl.dart:371:24)
#17     _TransformFuture._sendValue.<anonymous closure> (dart:async/future_impl.dart:348:48)
#18     _ZoneBase._runInZone (dart:async/zone.dart:82:17)
#19     _ZoneBase._runUnguarded (dart:async/zone.dart:102:22)
#20     _ZoneBase.executeCallback (dart:async/zone.dart:58:23)
#21     _TransformFuture._sendValue (dart:async/future_impl.dart:348:26)
#22     _FutureImpl._setValueUnchecked (dart:async/future_impl.dart:184:26)
#23     _FutureImpl._asyncSetValue.<anonymous closure> (dart:async/future_impl.dart:218:25)
#24     _asyncRunCallback (dart:async/event_loop.dart:9:15)
#25     _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#26     _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#27     _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#28     _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)


Unhandled exception:
Bad state: Stream has already been listened to.
#0      _DefaultZone.handleUncaughtError.<anonymous closure> (dart:async/zone.dart:146:7)
#1      _asyncRunCallback (dart:async/event_loop.dart:9:15)
#2      _asyncRunCallback (dart:async/event_loop.dart:13:7)
#3      _createTimer.<anonymous closure> (dart:async-patch/timer_patch.dart:8:13)
#4      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:95:21)
#5      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#6      _Timer._createTimerHandler._handleTimeout (timer_impl.dart:103:7)
#7      _Timer._createTimerHandler.<anonymous closure> (timer_impl.dart:111:23)
#8      _ReceivePortImpl._handleMessage (dart:isolate-patch/isolate_patch.dart:81:92)

誰かがここで何が起こっているのかを理解するのを手伝ってくれますか?

ありがとう、

4

1 に答える 1

3

After debugging the dart code for your example, It seems to me this is a bug.

The take call generates a Take-stream that has a reference to a Where-stream that has a reference to the broadcastStream stream1. By the way, because stream1 is a broadcastStream, this also holds for the Where- and Take-stream (checked during debugging).

The counter for the number of events is a state variable of the Take-stream, but each time you subscribe to the Take-Stream (by calling listen), this creates a subscription to the Where-stream which in turns creates a subscription to stream1. In this case you have 2 subscriptions to stream2 (1 via listen, 1 via the controller) resulting in two 2 subscriptions to stream1 (so 4 in total because of stream3 but this is not important for the discussion).

As a consequence, when stream1 fires event 0, it passes twice through the Take-stream in stream2 decreasing the counter of the Take-stream twice and causing the onDone-event to pass to the second subscription only. Because the counter is decreased by 2 and it is a state variable of the Take-stream, the Take-Stream will not fire anymore when stream1 fires event 2.

So this looks like a bug to me. Two observations.

  1. Should the take-stream counter be a state variable in the subscription to the take-stream instead of the take-stream itself?

  2. Should every subscription to the Take-Stream result in a new subscription to the source stream? (see also ForwardingStream in the Dart code). This might depend on the broadcast-context.

Maybe they should have a further look at the Rx-code.

于 2014-01-14T01:04:55.393 に答える