1

Kafka ログを使用して HDFS に書き込む Apache Apex アプリケーションがあります。

DAG は、ストリームによって "MyWriter extends AbstractFileOutputOperator" に接続された Kafka Consumer (オペレーター用の 2 GB メモリーの 20 パーティション) があるほど単純です。

問題: 1. Writer が同じサイズと同じデータの .tmp ファイルを何度も繰り返し書き込んでいるのを見てきました。Write Operator メモリを増やしたり、Writer のパーティション数を増やしたりしましたが、それでもこの問題は発生し続けます。

MyWriter に requestFinalize を追加・削除してみました。まだ同じ問題。

 @Override
    public void endWindow()
    {
        if (null != fileName) {
            requestFinalize(fileName);
        }
        super.endWindow();
    }

これは私の properties.xml のサブセットです

<property>
    <name>dt.attr.STREAMING_WINDOW_SIZE_MILLIS</name>
    <value>1000</value>
  </property>

  <property>
    <name>dt.application.myapp.operator.*.attr.APPLICATION_WINDOW_COUNT</name>
    <value>60</value>
  </property>

  <property>
    <name>dt.application.myapp.operator.*.attr.CHECKPOINT_WINDOW_COUNT</name>
     <value>60</value>
  </property>

 <property>
        <name>dt.application.myapp.operator.myWriter.attr.PARTITIONER</name>
        <value>com.datatorrent.common.partitioner.StatelessPartitioner:20</value>
    </property>

  <property>
    <name>dt.application.myapp.operator.myWriter.prop.maxLength</name>
    <value>1000000000</value> <!-- 1 GB File -->
  </property>

これは、オペレーターの dt.log から取得できたスタック トレースです。オペレーターは、おそらく別のコンテナーに再デプロイされ、この例外をスローし、重複ファイルを書き続けます。

 java.lang.RuntimeException: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:418)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:112)
        at com.datatorrent.stram.engine.Node.setup(Node.java:187)
        at com.datatorrent.stram.engine.StreamingContainer.setupNode(StreamingContainer.java:1309)
        at com.datatorrent.stram.engine.StreamingContainer.access$100(StreamingContainer.java:130)
        at com.datatorrent.stram.engine.StreamingContainer$2.run(StreamingContainer.java:1388)
    Caused by: java.io.FileNotFoundException: File does not exist: /kafkaconsumetest/inventoryCount/nested/trial2/1471489200000_1471489786800_161.0.1471489802786.tmp
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1219)
        at org.apache.hadoop.hdfs.DistributedFileSystem$19.doCall(DistributedFileSystem.java:1211)
        at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
        at org.apache.hadoop.hdfs.DistributedFileSystem.getFileStatus(DistributedFileSystem.java:1211)
        at com.datatorrent.lib.io.fs.AbstractFileOutputOperator.setup(AbstractFileOutputOperator.java:411)
        ... 5 more
2016-08-17 22:17:01,108 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy request: [161, 177]
2016-08-17 22:17:01,116 INFO com.datatorrent.stram.engine.StreamingContainer: Undeploy complete.
2016-08-17 22:17:02,121 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:02,625 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
2016-08-17 22:17:03,129 INFO com.datatorrent.stram.engine.StreamingContainer: Waiting for pending request.
4

1 に答える 1

1

基本演算子のコードは次のリンクにあり、以下のコメントで参照されています: https://github.com/apache/apex-malhar/blob/master/library/src/main/java/com/datatorrent/lib /io/fs/AbstractFileOutputOperator.java

最大ファイル サイズを 1 GB に設定すると、ローリング ファイルが自動的に有効になります。関連するフィールドは次のとおりです。

protected Long maxLength = Long.MAX_VALUE;
protected transient boolean rollingFile = false;

setup()前者の値がデフォルト値の より小さい場合、後者はメソッドで true に設定されますLong.MAX_VALUE

ローリング ファイルが有効になっている場合、ファイルのファイナライズは自動的に行われるため、 を呼び出さないでくださいrequestFinalize()

次に、MyWriterクラスでendWindow()オーバーライドを削除し、メソッドにオペレーター ID を含む必要なファイル名を作成しsetup()、オーバーライドでこのファイル名を返すようにしgetFileName()ます。これにより、複数のパーティショナーが互いに影響を与えることがなくなります。例えば:

@NotNull
private String fileName;           // current base file name

private transient String fName;    // per partition file name

@Override
public void setup(Context.OperatorContext context)
{
  // create file name for this partition by appending the operator id to
  // the base name
  //
  long id = context.getId();
  fName = fileName + "_p" + id;
  super.setup(context);

  LOG.debug("Leaving setup, fName = {}, id = {}", fName, id);
}

@Override
protected String getFileName(Long[] tuple)
{
  return fName;
}

ファイル ベース名 (fileName上記のコード内) は、コード内で直接設定するか、XML ファイル内のプロパティから初期化することができます (同様にゲッターとセッターを追加する必要があります)。

このタイプの使用例は、 https ://github.com/DataTorrent/examples/tree/master/tutorials/fileOutput で確認できます。

いくつかの追加の提案:

  1. PARTITIONERパーティション カウントを 1 に設定 (または属性を設定する XML をコメント アウト) し、すべてが期待どおりに機能することを確認します。これにより、パーティショニングに関連しない問題が解消されます。可能であれば、最大ファイル サイズを 2K または 4K に減らして、テストを容易にします。
  2. シングル パーティションのケースが機能したら、パーティションの数を 2 に増やします。
于 2016-08-19T14:36:42.523 に答える