4

この質問は、前の質問の続きです。File.openRead()行ごとにストリーミングできるストリームが作成されたかどうかを判断するために、次のコードを書きました。答えはノーであることがわかりました。ファイル全体が読み取られ、次の変換に渡されます。私の質問は次のとおりです:Dartでファイルを1行ずつストリーミングするにはどうすればよいですか?

import 'dart:async';
import 'dart:convert';
import 'dart:io';


void main(List<String> arguments) {

  Stream<List<int>> stream = new File('Data.txt').openRead();

   stream
      .transform(const Utf8InterceptDecoder())
        .transform(const LineSplitterIntercept())
          .listen((line) {
//            stdout.writeln(line);
          }).asFuture().catchError((_) => print(_));
}

int lineSplitCount = 0;

class LineSplitterIntercept extends LineSplitter {

  const LineSplitterIntercept() : super();
  // Never gets called
  List<String> convert(String data) {
    stdout.writeln("LineSplitterIntercept.convert : Data:" + data);
    return super.convert(data);
  }

  StringConversionSink startChunkedConversion(ChunkedConversionSink<String> sink) {
    stdout.writeln("LineSplitterIntercept.startChunkedConversion Count:"+lineSplitCount.toString()+ " Sink: " + sink.toString());
    lineSplitCount++;
    return super.startChunkedConversion(sink);
  }
}

int utfCount = 0;

class Utf8InterceptDecoder extends Utf8Decoder {

  const Utf8InterceptDecoder() : super();

  //never gets called
  String convert(List<int> codeUnits) {
    stdout.writeln("Utf8InterceptDecoder.convert : codeUnits.length:" + codeUnits.length.toString());
    return super.convert(codeUnits);
  }


  ByteConversionSink startChunkedConversion(ChunkedConversionSink<String> sink) {
    stdout.writeln("Utf8InterceptDecoder.startChunkedConversion Count:"+ utfCount.toString() + " Sink: "+ sink.toString());
    utfCount++;
    return super.startChunkedConversion(sink);
  }
}
4

4 に答える 4

2

コンバーターstartChunkedConversionは、変換が開始されたときに一度だけ呼び出されます。ただし、返されたシンクのaddメソッドは、ファイルの一部で複数回呼び出されます。

チャンクの大きさを決定するのはソース次第ですが、37MB のファイル (前の質問で述べたように) は間違いなく小さなチャンクで送信されます。

チャンクを見たい場合はstartChunkedConversion、ラップされたシンクをインターセプトして返すか、openReadとトランスフォーマーの間に身を置くことができます。

インターセプト:

class InterceptSink {
  static int lineSplitCount = 0;

  final _sink;
  InterceptSink(this._sink);
  add(x) {
    print("InterceptSink.add Count: $lineSplitCount");
    lineSplitCount++;
    _sink.add(x);
  }
  close() { _sink.close(); }
}

class LineSplitterIntercept extends Converter {
  convert(x) { throw "unimplemented"; }
  startChunkedConversion(outSink) {
    var lineSink = new LineSplitter().startChunkedConversion(outSink);
    return new InterceptSink(lineSink);
  }
}

openRead:

file.openRead()
  .transform(UTF8.decoder)
  .map(x) {
    print("chunk size: ${x.length)");
    return x;
  }
  .transform(new LineSplitter())
  ...
于 2013-12-29T12:34:31.077 に答える