csv ファイルを HDFS から Hive に移動するアプリケーションがあります。そのプロセスには Storm Topology を使用しています。
8台のマシンが使用されています。それぞれに 22 個のコアと 512 GB の RAM があります。ただし、コードの実行は非常に遅くなります。600 万のデータ転送が完了するまでに 10 分かかります。
60 個のファイルのうち 10 MB が 1 秒間に HDFS に転送されます。コードを最適化しようとしていますが、非常に間違ったことをしていることは明らかです。
Hive テーブルには、64 個のバケットがあります。
このトポロジーには、1 つのスパウトと 2 つのボルトがあります。基本的に、Spout は CSV ファイルを取得し、データの解析を担当する最初の Bolt に行を送信し、次に、Bolt が HDFS プロセスを担当する 2 番目の Bolt に送信します。
HDFS スパウト;
HdfsSpout hdfsSpout = new HdfsSpout()
.withOutputFields(TextFileReader.defaultFields)
.setReaderType("text")
.setHdfsUri(hdfsUri)
.setSourceDir("/data/in")
.setArchiveDir("/data/done")
.setBadFilesDir("/data/bad")
.setClocksInSync(true) // NTP installed on all hosts
.setIgnoreSuffix("_COPYING_")
// do not begin reading file until it is completely copied to HDFS
.setMaxOutstanding(50_000);
マッパー;
DelimitedRecordHiveMapper mapper = new DelimitedRecordHiveMapper()
.withColumnFields(new Fields(TTDPIRecord.fieldsList))
.withPartitionFields(new Fields(TTDPIRecord.partitionFieldsList));
ハイブ オプション;
HiveOptions hiveOptions = new HiveOptions(metaStoreURI, dbName, tblName, mapper)
.withAutoCreatePartitions(true)
.withHeartBeatInterval(3)
.withCallTimeout(10_000) // default = 10.000
.withTxnsPerBatch(2)
.withBatchSize(50_000)
// doing below because its affecting storm metrics most likely
.withTickTupleInterval(1);
構成;
Config conf = new Config();
conf.setNumWorkers(6);
conf.setNumAckers(6);
conf.registerMetricsConsumer(LoggingMetricsConsumer.class);
トポロジビルダー;
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("hdfsSpout", hdfsSpout, 8);
builder.setBolt("recordParserBolt", recordParserBolt, 8).localOrShuffleGrouping("hdfsSpout");
builder.setBolt("hiveBolt", hiveBolt, 8).localOrShuffleGrouping("recordParserBolt");
次のパラメータについては不明です。
HDFS スパウトで。.setMaxOutstanding(50_000);
ハイブスパウトオプションで。.withTxnsPerBatch(2) .withBatchSize(50_000) .withTickTupleInterval(1);
設定で; .setNumWorkers(6); .setNumAckers(6);
スパウトとボルトの平行度; それぞれに8つ与えました。
これらのパラメーターの値は何にする必要がありますか? 前もって感謝します。
編集; これは、100 個の csv ファイルの 10 mb のテスト結果です。
hdfsSpout Executor: 8 完全遅延: 1834.209 ミリ秒
recordParserBolt Executor: 8 完全遅延: 0.019 ミリ秒
hiveBolt Executor: 8 完全遅延: 1092.624 ミリ秒