4

fs2filteredを使用して現在のストリーミング ファイルから行を削除し、フィルター処理された行の数を戻り値の型として取得する方法は?

例:old.txtに改行 (\n) で区切られた文字列が含まれている場合:

 john
 sam
 chen
 yval
 ....

val myList = List("chen","yval")

def converter[F[_]](implicit F: Sync[F]): F[Unit] =
  io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    .filter(s => myList.contains(s))//remove this from the old file and write to new file
    .intersperse("\n")
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get("testdata/new.txt")))
    .compile.drain

// at the end of the universe...
val u: Unit = converter[IO].unsafeRunSync()
4

1 に答える 1

3

クラスのobserveメソッドを使用できます。Stream

関数 を探しています。この関数def converter[F[_]: Sync]: F[Int]は、F[Int](タイプ のInt)結果がフィルター処理された行の数であり、その効果はそれらの行を出力ファイルに書き込むことです。配管の類推に従うと、フィルタリングされたストリームを 2 つの出力 (1 つは結果用、もう 1 つは効果用)にフィードする必要があります。次のように定義された関数observeを使用してこれを行うことができます。

def observe(sink: Sink[F, O])(implicit F: Effect[F], ec: ExecutionContext): Stream[F, O] 

ASink[F,O]は関数のエイリアスですStream[F, O] => Stream[F, Unit]。あなたの場合、シンクは、フィルタリングされたストリームを出力ファイルに書き込むコードの一部です。

def writeToFile[F[_]: Sync]: Sink[F, String] = 
  _.intersperse("\n")
  .through(text.utf8Encode)
  .through(io.file.writeAll(Paths.get("testdata/new.txt")))

もう1つの出力は、削減、またはむしろ折りたたむことです。

  def converter[F[_]](implicit F: Effect[F], ec: ExecutionContext): F[Int] = 
    io.file.readAll[F](Paths.get("testdata/old.txt"), 4096)
      .through(text.utf8Decode)
      .through(text.lines)
      .filter(s => myList.contains(s))
      .observe(writeToFile[F])
      .compile
      .fold[Int](0)( (c, _) => c+1)
}

: このソリューションでは、型クラスを に制限するF必要Effectがあり、 を使用する必要がありますExecutionContext。はクラスfoldで定義されています。ToEffect

于 2018-07-12T23:23:38.530 に答える