1

2 つのクラスターがあります。

  • コンフルエントな家の中の1つ(3.0.0-1)
  • AWS に 1 つ、hadoop (hdp 2.4) を使用

hdfs コネクタを使用して、コンフルエントから Hadoop に書き込もうとしています。

簡単に言えば、コネクタは、ホスト名を使用する代わりに、hadoop クラスターのプライベート IP に接続しようとします。社内クラスターでは、/etc/hosts が更新され、内部 Hadoop ホスト名が関連するパブリック IP に解決されます。

私は分散コネクタを使用しています。次のようなコネクタ JSON ファイルがたくさんあります。

{
   "name": "sent-connector",

   "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
   "tasks.max": "1",
   "topics": "sent",

   "topics.dir":"/kafka-connect/topics",
   "logs.dir":"/kafka-connect/wal",
   "hdfs.url": "hdfs://ambari:8020",

   "hadoop.conf.dir": "/etc/hadoop/conf",
   "hadoop.home": "/usr/hdp/current/hadoop-client",

   "flush.size": "100",

   "hive.integration":"true",
   "hive.metastore.uris":"thrift://ambari:9083",
   "hive.database":"events",
   "hive.home": "/usr/hdp/current/hive-client",
   "hive.conf.dir": "/etc/hive/conf",

   "schema.compatibility":"FULL",

   "partitioner.class": "io.confluent.connect.hdfs.partitioner.HourlyPartitioner",
   "path.format": "'year'=YYYY/'month'=MM/'day'=dd/'hour'=HH/",
   "locale": "C",
   "timezone": "UTC",

   "rotate.interval.ms": "2000"

}

ワーカーは次のように定義されます。

rest.port=8083
bootstrap.servers=<eth0 IP of the server>:9092
group.id=dp2hdfs
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=schemareg.dpe.webpower.io
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=schemareg.dpe.webpower.io
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
config.storage.topic=k2hdfs-configs
offset.storage.topic=k2hdfs-offsets
status.storage.topic=k2hdfs-statuses
debug=true

いくつかのメモ:

  • /kafka-connect は hdfs に存在し、誰でも書き込み可能
  • 3 つのトピック (*.storage.topic) は存在します
  • Kafkaブローカーを使用して各(3)サーバーで1つのワーカーを実行しています(すべてのブローカーにスキーマレジストリ、残りのAPI、およびzookeeperサーバーがあります)
  • dfs.client.use.datanode.hostname を true に設定しました。このプロパティはクライアントの $HADOOP_HOME/hdfs-site.xml に設定されています。

/kafka-connect のサブディレクトリとハイブ メタデータが作成されていることがわかります。コネクタを起動すると、次のメッセージが表示されます。

createBlockOutputStream での INFO 例外 (org.apache.hadoop.hdfs.DFSClient:1471) org.apache.hadoop.net.ConnectTimeoutException: チャネルの接続準備が整うまでの待機中に 60000 ミリ秒のタイムアウトが発生しました。ch : java.nio.channels.SocketChannel[org.apache.hadoop.net.NetUtils.connect(NetUtils.java:533) の接続保留リモート org.apache.hadoop.hdfs.DFSOutputStream.createSocketForPipeline(DFSOutputStream.java: 1610) org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.createBlockOutputStream(DFSOutputStream.java:1408) org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.nextBlockOutputStream(DFSOutputStream.java:1361) org.apache.hadoop .hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:588) INFO 破棄 BP-429601535-10.0.0.167-1471011443948:blk_1073742319_1495 (org.apache.hadoop.hdfs.DFSClient:

これを修正する方法について何か考えはありますか? コンフルエントは、ホスト名ではなく IP を直接受け取るようです。

4

0 に答える 0