1

csv ファイルを HDFS から Hive に移動するアプリケーションがあります。そのプロセスには Storm Topology を使用しています。

8台のマシンが使用されています。それぞれに 22 個のコアと 512 GB の RAM があります。ただし、コードの実行は非常に遅くなります。600 万のデータ転送が完了するまでに 10 分かかります。

60 個のファイルのうち 10 MB が 1 秒間に HDFS に転送されます。コードを最適化しようとしていますが、非常に間違ったことをしていることは明らかです。

Hive テーブルには、64 個のバケットがあります。

このトポロジーには、1 つのスパウトと 2 つのボルトがあります。基本的に、Spout は CSV ファイルを取得し、データの解析を担当する最初の Bolt に行を送信し、次に、Bolt が HDFS プロセスを担当する 2 番目の Bolt に送信します。

HDFS スパウト;

HdfsSpout hdfsSpout = new HdfsSpout()
    .withOutputFields(TextFileReader.defaultFields)
    .setReaderType("text")
    .setHdfsUri(hdfsUri)
    .setSourceDir("/data/in")
    .setArchiveDir("/data/done")
    .setBadFilesDir("/data/bad")
    .setClocksInSync(true) // NTP installed on all hosts
    .setIgnoreSuffix("_COPYING_") 
// do not begin reading file until it is completely copied to HDFS
    .setMaxOutstanding(50_000);

マッパー;

DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
    .withColumnFields(new Fields(TTDPIRecord.fieldsList))
    .withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));

ハイブ オプション;

HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
    .withAutoCreatePartitions(true)
    .withHeartBeatInterval(3)
    .withCallTimeout(10_000) // default = 10.000
    .withTxnsPerBatch(2)
    .withBatchSize(50_000) 
// doing below because its affecting storm metrics most likely
    .withTickTupleInterval(1);

構成;

Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);

トポロジビルダー;

TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");

次のパラメータについては不明です。

HDFS スパウトで。.setMaxOutstanding(50_000);

ハイブスパウトオプションで。.withTxnsPerBatch(2) .withBatchSize(50_000) .withTickTupleInterval(1);

設定で; .setNumWorkers(6); .setNumAckers(6);

スパウトとボルトの平行度; それぞれに8つ与えました。

これらのパラメーターの値は何にする必要がありますか? 前もって感謝します。

編集; これは、100 個の csv ファイルの 10 mb のテスト結果です。

hdfsSpout Executor: 8 完全遅延: 1834.209 ミリ秒

recordParserBolt Executor: 8 完全遅延: 0.019 ミリ秒

hiveBolt Executor: 8 完全遅延: 1092.624 ミリ秒

4

1 に答える 1