4

prePut フックを使用してセカンダリ インデックスを作成する独自のコプロセッサを作成しようとしています。手始めに、私は prePut コプロセッサーを機能させようとしてきました。ここまでで、渡された put オブジェクトにコプロセッサーを追加することができました。私が見つけたのは、渡された put オブジェクトが書き込んでいるものとは別の行にコプロセッサに書き込むことができないということです。セカンダリ インデックスを作成するには、明らかに、これを把握する必要があります。

以下は私のコプロセッサーのコードですが、動作しません。
はい、すべてのテーブルが存在し、「colfam1」も存在します。
HBase バージョン: Cloudera の CDH4 の HBase 0.92.1-cdh4.1.2

誰が問題が何であるか知っていますか?

    @Override
        public void prePut(final ObserverContext<RegionCoprocessorEnvironment> e,final Put put, final WALEdit edit, final boolean writeToWAL) throws IOException {          
            KeyValue kv = new KeyValue(Bytes.toBytes("COPROCESSORROW"), Bytes.toBytes("colfam1"),Bytes.toBytes("COPROCESSOR: "+System.currentTimeMillis()),Bytes.toBytes("IT WORKED"));
            put.add(kv);
        }

次のエラーが表示されます。

    ERROR: org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 1 action: IOException: 1 time, servers with issues:

アップデート:

コプロセッサを次のように変更しましたが、まだエラーが発生します。Post-Put (セカンダリ インデックス) が書き込まれるようになりましたが、まだタイムアウト エラーが発生します。
リージョンのテーブル全体もクラッシュし、リージョンを再起動する必要があります。リージョンの再起動が機能せず、リージョン全体 (すべてのテーブル) が破損し、サーバーの再構築が必要になる場合があります。

わけがわからない…!?

@Override
      public void start(CoprocessorEnvironment env) throws IOException {        
        LOG.info("(start)");
        pool = new HTablePool(env.getConfiguration(), 10);
     }

    @Override
    public void postPut(final ObserverContext<RegionCoprocessorEnvironment> observerContext,final Put put,final WALEdit edit,final boolean writeToWAL) throws IOException {
        byte[] tableName  = observerContext.getEnvironment().getRegion().getRegionInfo().getTableName();

        //not necessary though if you register the coprocessor for the specific table , SOURCE_TBL
        if (!Bytes.equals(tableName, Bytes.toBytes(SOURCE_TABLE))) 
            return;         

        try {           
            LOG.info("STARTING postPut");
            HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));
            LOG.info("TURN OFF AUTOFLUSH");
            table.setAutoFlush(false);
            //create row              
            LOG.info("Creating new row");            
            byte [] rowkey = Bytes.toBytes("COPROCESSOR ROW");
            Put indexput  = new Put(rowkey); 
            indexput.add(Bytes.toBytes ( "data"),  Bytes.toBytes("CP: "+System.currentTimeMillis()),  Bytes.toBytes("IT WORKED!"));
            LOG.info("Writing to table");
            table.put(indexput);
            LOG.info("flushing commits");            
            table.flushCommits();
            LOG.info("close table");
            table.close();

        } catch ( IllegalArgumentException ex) {

            //handle excepion.
        }

      }


      @Override
      public void stop(CoprocessorEnvironment env) throws IOException {
        LOG.info("(stop)");
        pool.close();
      }

リージョンサーバーのログは次のとおりです: (ログのコメントに注意してください)

2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: STARTING postPut
2013-01-30 19:30:39,754 INFO my.package.MyCoprocessor: TURN OFF AUTOFLUSH
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Creating new row
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: Writing to table
2013-01-30 19:30:39,755 INFO my.package.MyCoprocessor: flushing commits
2013-01-30 19:31:39,813 WARN org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation: Failed all from region=test_table,,1359573731255.d41b77b31fafa6502a8f09db9c56b9d8., hostname=node01, port=60020
java.util.concurrent.ExecutionException: java.net.SocketTimeoutException: Call to node01/<private_ip>:60020 failed on socket timeout exception: java.net.SocketTimeoutException: 60000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=/<private_ip>:56390 remote=node01/<private_ip>:60020]
    at java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:222)
    at java.util.concurrent.FutureTask.get(FutureTask.java:83)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatchCallback(HConnectionManager.java:1557)
    at org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.processBatch(HConnectionManager.java:1409)
    at org.apache.hadoop.hbase.client.HTable.flushCommits(HTable.java:949)
    at org.apache.hadoop.hbase.client.HTablePool$PooledHTable.flushCommits(HTablePool.java:449)
    at my.package.MyCoprocessor.postPut(MyCoprocessor.java:81)
    at org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost.postPut(RegionCoprocessorHost.java:682)
    at org.apache.hadoop.hbase.regionserver.HRegion.doMiniBatchPut(HRegion.java:1901)
    at org.apache.hadoop.hbase.regionserver.HRegion.put(HRegion.java:1742)
    at org.apache.hadoop.hbase.regionserver.HRegionServer.multi(HRegionServer.java:3102)
    at sun.reflect.GeneratedMethodAccessor22.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:364)
    at org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1345)

解決済み:コプロセッサーが作業しているコプロセッサーの同じテーブルに書き込もうとしていました。要するに、セルを書き込んだときに、CP がセルを書き込んで、CP が再びトリガーされ、別のセルが書き込まれるということでした。このループを防ぐために、CP行を書き込む行チェックb4を実行して停止しました。

4

2 に答える 2

5

以下は、Hbaseでコプロセッサーを使用してセカンダリインデックスを作成する方法に関するコードスニペットです。あなたに役立つことができます。

public class TestCoprocessor extends BaseRegionObserver{

    private HTablePool pool = null;

    private final static String  INDEX_TABLE  = "INDEX_TBL";
    private final static String  SOURCE_TABLE = "SOURCE_TBL";

    @Override
    public void start(CoprocessorEnvironment env) throws IOException {  
        pool = new HTablePool(env.getConfiguration(), 10);
    }

    @Override
    public void postPut(
        final ObserverContext<RegionCoprocessorEnvironment> observerContext,
        final Put put,
        final WALEdit edit,
        final boolean writeToWAL)
        throws IOException {

        byte[] table = observerContext.getEnvironment(
            ).getRegion().getRegionInfo().getTableName();

        // Not necessary though if you register the coprocessor
        // for the specific table, SOURCE_TBL
        if (!Bytes.equals(table, Bytes.toBytes(SOURCE_TABLE))) {
            return; 
        }

        try {
            final List<KeyValue> filteredList = put.get(
                Bytes.toBytes ( "colfam1"), Bytes.toBytes(" qaul"));
            filteredList.get( 0 ); //get the column value

            // get the values 
            HTableInterface table = pool.getTable(Bytes.toBytes(INDEX_TABLE));

            // create row key             
            byte [] rowkey = mkRowKey () //make the row key
            Put indexput = new Put(rowkey); 
            indexput.add(
                Bytes.toBytes( "colfam1"),
                Bytes.toBytes(" qaul"),
                Bytes.toBytes(" value.."));

            table.put(indexput);
            table.close();

        } catch ( IllegalArgumentException ex) {
            // handle excepion.
        }

    }


    @Override
    public void stop(CoprocessorEnvironment env) throws IOException {
        pool.close();
    }

}

上記のコプロセッサーをSOURCE_BLに登録するには、hbaseシェルに移動し、以下の手順に従います。

  1. 'SOURCE_TBL'を無効にする
  2. alter'SOURCE_TBL'、METHOD =>'table_att'、'coprocessor' =>'file:///path/to/coprocessor.jar | TestCoprocessor | 1001'
  3. 'SOURCE_TBL'を有効にする
于 2013-01-27T06:48:38.520 に答える
-2

セカンダリ インデックスが HBase に組み込まれました。同じことについては、このブログエントリを見てください。同じために HBase で CoProcessors を使用する必要はありません。

于 2013-01-27T02:30:10.000 に答える