0

Kotlin フローに慣れ始めたばかりです。

このために、それらを使用して、次のフローを使用してシミュレートするバイナリ ファイルの内容を解析しています。

fun testFlow() = flow {
    println("Starting loop")

    try {
        for (i in 0..5) {
            emit(i)
            delay(100)
        }

        println("Loop has finished")
    }
    finally {
        println("Finally")
    }
}

さて、基本的にさまざまな情報セットを抽出するために、ファイルの内容を複数回必要とします。ただし、ファイルを 2 回読み取るのではなく、1 回だけ読み取ります。

フローを複製/複製する組み込みのメカニズムがないように見えるため、次のヘルパー関数を開発しました。

interface MultiConsumeBlock<T> {
    suspend fun subscribe(): Flow<T>
}

suspend fun <T> Flow<T>.multiConsume(capacity: Int = DEFAULT_CONCURRENCY, scope: CoroutineScope? = null, block: suspend MultiConsumeBlock<T>.() -> Unit) {
    val channel = buffer(capacity).broadcastIn(scope ?: CoroutineScope(coroutineContext))

    val context = object : MultiConsumeBlock<T> {
        override suspend fun subscribe(): Flow<T> {
            val subscription = channel.openSubscription()
            return flow { emitAll(subscription) }
        }
    }
    try {
        block(context)
    } finally {
        channel.cancel()
    }
}

次に、これを次のように使用します (ファイルとの類推について考えてください: フローaはすべてのレコードを取得bし、最初の 3 つのレコード (="ファイル ヘッダー") のみをフローcし、ヘッダーの後にすべてをフローします):

fun main() = runBlocking {
    val src = testFlow()

    src.multiConsume {
        val a = subscribe().map { it }
        val b = subscribe().drop(3).map{ it + it}
        val c = subscribe().take(3).map{ it * it}

        mapOf("A" to a, "B" to b, "C" to c).map { task -> launch { task.value.collect{ println("${task.key}: $it")} } }.toList().joinAll()
    }
}

出力:

Starting loop
A: 0
C: 1
A: 1
C: 2
A: 4
C: 3
A: 9
C: 4
A: 16
C: 5
B: 10
C: 6
B: 12
C: 7
B: 14
C: 8
B: 16
C: 9
B: 18
C: 10
B: 20
C: 11
Loop has finished
Finally

これまでのところ良さそうです。ただし、この点で Kotlin のフローを正しく使用しているかどうかはわかりません。
デッドロックや見逃した例外などに自分自身を開放していますか?

ドキュメントには次のように記載されています

Flow インターフェイスのすべての実装は、以下で詳しく説明する 2 つの主要なプロパティに準拠する必要があります。

  • コンテキストの保存。
  • 例外的な透明性。

しかし、それが私の実装の場合なのか、それとも何か不足しているのかはわかりません。
それとも、もっと良い方法がありますか?

4

0 に答える 0