38

にアイテムを送信/送信する方法を知りたかったKotlin.Flowので、私のユースケースは次のとおりです。

consumer/ViewModel/Presenter では、次の 関数でサブスクライブできます。collect

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

しかし、問題はRepository脇にあります。RxJava では、Behaviorsubjectを使用してそれを として公開し、次のObservable/Flowableような新しいアイテムを発行できます。

behaviourSubject.onNext(true)

しかし、新しいフローを構築するたびに:

flow {

}

集めるしかない。値をフローに送信するにはどうすればよいですか?

4

3 に答える 3

48

サブスクリプション/コレクションで最新の値を取得したい場合は、 ConflatedBroadcastChannelを使用する必要があります。

private val channel = ConflatedBroadcastChannel<Boolean>()

これは をレプリケートBehaviourSubjectし、チャネルをフローとして公開します。

// Repository
fun observe() {
  return channel.asFlow()
}

ここで、公開されたFlow単純な送信にイベント/値をこのチャネルに送信します。

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}

コンソール:

間違い

収集を開始した後にのみ値を受け取りたい場合は、BroadcastChannel代わりに a を使用する必要があります。

明確にするために:

Rx として動作します。PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(false)
}

間違い

の前にfalse最初のイベントが送信されたときにのみ出力されます。 collect { }


Rx として動作します。BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(false)
}

真実

間違い

両方のイベントが出力され、常に最新の値が取得されます (存在する場合)。

DataFlowまた、 (名前は保留中)に関する Kotlin のチーム開発について言及したいと思います。

どちらがこのユースケースにより適しているようです (コールド ストリームになるため)。

于 2019-08-04T09:12:57.433 に答える
9

更新

Kotlin コルーチン1.4.0が で利用できるようになりました。MutableSharedFlowこれは の必要性を置き換えますChannelMutableSharedFlowcleanup も組み込まれているため、 とは異なり、手動で OPEN & CLOSE する必要はありませんChannelMutableSharedFlowサブジェクトのような API が必要な場合に使用してください。Flow

元の答え

あなたの質問には タグがあったので、独自のライフサイクルを処理するまたはandroidを簡単に作成できる Android 実装を追加します。BehaviorSubjectPublishSubject

チャネルを閉じてメモリ リークするのを忘れたくないので、これは Android に関連しています。この実装では、リアクティブ ストリームをフラグメント/アクティビティの作成と破棄に関連付けることで、リアクティブ ストリームを明示的に「破棄」する必要がなくなります。に似ているLiveData

interface EventReceiver<Message> {
    val eventFlow: Flow<Message>
}

interface EventSender<Message> {
    fun postEvent(message: Message)
    val initialMessage: Message?
}

class LifecycleEventSender<Message>(
    lifecycle: Lifecycle,
    private val coroutineScope: CoroutineScope,
    private val channel: BroadcastChannel<Message>,
    override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {

    init {
        lifecycle.addObserver(this)
    }

    override fun postEvent(message: Message) {
        if (!channel.isClosedForSend) {
            coroutineScope.launch { channel.send(message) }
        } else {
            Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun create() {
        channel.openSubscription()
        initialMessage?.let { postEvent(it) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun destroy() {
        channel.close()
    }
}

class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
    EventReceiver<Message> {
    override val eventFlow: Flow<Message> = channel.asFlow()
}

abstract class EventRelay<Message>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    channel: BroadcastChannel<Message>,
    initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
    EventSender<Message> by LifecycleEventSender<Message>(
        lifecycle,
        coroutineScope,
        channel,
        initialMessage
    )

Androidのライブラリを使用することで、アクティビティ/フラグメントが破棄された後にそれ自体をクリーンアップする をLifecycle作成できるようになりましたBehaviorSubject

class BehaviorSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    ConflatedBroadcastChannel(),
    initialMessage
)

またはPublishSubject、バッファリングされたものを使用して作成できますBroadcastChannel

class PublishSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    BroadcastChannel(Channel.BUFFERED),
    initialMessage
)

そして今、私はこのようなことをすることができます

class MyActivity: Activity() {

    val behaviorSubject = BehaviorSubject(
        this@MyActivity.lifecycle,
        this@MyActivity.lifecycleScope
    )

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        if (savedInstanceState == null) { 

            behaviorSubject.eventFlow
                .onEach { stringEvent ->
                    Log.d("BehaviorSubjectFlow", stringEvent)
                    // "BehaviorSubjectFlow: Initial Message"
                    // "BehaviorSubjectFlow: Next Message"
                }
                .flowOn(Dispatchers.Main)
                .launchIn(this@MyActivity.lifecycleScope)

        }
    }

    override fun onResume() {
        super.onResume()

        behaviorSubject.postEvent("Next Message")
    }
}
于 2020-05-13T21:18:06.950 に答える