私はKafka + Hbaseを初めて使用します。あなたの助けに感謝。
Kafka メッセージ ストリームを取得し、Kafka から読み取る Hbase にデータを挿入したいと考えています。
トピックからメッセージを読み取る Java コンシューマー (単一トピック、単一パーティション、および 1 つのスレッド) を作成しました。すべて良好。スレッドで、Hbase に接続してメッセージをテーブルに挿入しようとしていますが、機能していません。hbase に接続しようとする (HTable のインスタンスを作成する) 瞬間に、ストリームを読み取る kafka スレッドが強制終了されます。これを克服してHbaseにデータを挿入するにはどうすればよいですか? この問題に関するあなたの考えと助けに大いに感謝します。
Kafka 高レベル コンシューマー コード
public void run() {
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
// Read Message
System.out.println("Thread " + m_threadNumber + ": " + new String(it.next().message()));
//Load into HBase
// Instantiating Configuration class
Configuration config = HBaseConfiguration.create();
config.clear();
//config.set("hbase.zookeeper.quorum", "localhost");
//config.set("hbase.zookeeper.property.clientPort","2181");
config.set("hbase.zookeeper.quorum", "localhost");
config.set("hbase.zookeeper.property.clientPort","2181");
config.set("hbase.master", "localhost");
// Instantiating HTable class
HTable hTable = new HTable(config, "device");
// Instantiating Put class
// accepts a row name.
Put p = new Put(Bytes.toBytes("id14"));
// adding values using add() method
// accepts column family name, qualifier/row name ,value
p.add(Bytes.toBytes("d_name"), Bytes.toBytes("primary"),Bytes.toBytes("ios"));
// Saving the put Instance to the HTable.
hTable.put(p);
// closing HTable
hTable.close();