0

SingleColumnValueFilter を使用して、削除したい行のリストを返しています。

SingleColumnValueFilter fileTimestampFilter = new SingleColumnValueFilter(
         Bytes.toBytes('a'),
         Bytes.toBytes('date'),
         CompareFilter.CompareOp.GREATER,
         Bytes.toBytes("20140101000000")
         );    

次に、Delete オブジェクトを作成し、各列を削除します。

Delete delete = new Delete(Bytes.toBytes(rowKey));
delete.deleteColumn(Bytes.toBytes('a'), Bytes.toBytes('date'));
htable.delete(delete);

取得コードは

private List<String> getRecordsToDelete(long maxResultSize)
{
  ResultScanner rs = null;
  HTableInterface table = null;
  List<String> keyList = new ArrayList<String>();
  try
  {
    log.debug("Retrieving records");      
    HbaseConnection hbaseConnectionConfig = myConfig.getHbaseConnection();
    Configuration configuration = getHbaseConfiguration(hbaseConnectionConfig);
    table = new HTable(configuration, 'mytable');
    FilterList list = new FilterList(FilterList.Operator.MUST_PASS_ALL);
    Filter filter = HbaseDao.getFilter();
    list.addFilter(filter);
    list.addFilter(new PageFilter(maxResultSize));
    Scan scan = new Scan();
    scan.setFilter(list);
    //scan.setMaxResultSize(maxResultSize);
    //scan.setCaching(1);
    //scan.setCacheBlocks(false);
    //log.debug("Scan raw? = " + scan.isRaw());
    //scan.setRaw(false);
    rs = table.getScanner(scan);      
    Iterator<Result> iterator = rs.iterator();      
    while (iterator.hasNext())
    {        
      Result result = iterator.next();        
      String key = Bytes.toString(result.getRow());
      log.debug("**************** f key = " + key); //the same keys are always added here
      keyList.add(key);        
    }
    log.debug("Done processing retrieval of records to delete Size = " + keyList.size());
  }
  catch (Exception ex)
  {
    log.error("Unable to process retrieval of records.", ex);
  }
  finally
  {
    try
    {
      if (table !=  null)
      {
        table.close();
      }
      if (rs != null)
      {
        rs.close();
      }
    }
    catch (IOException ioEx)
    {
      //do nothing
      log.error(ioEx);
    }
  }
  return keyList;
}

このタスクはスケジュールされており、再度実行すると同じ行が取得されます。hbase は行を削除するようにマークし、メジャー圧縮後にのみ物理的に削除されることを理解しています。タスクの実行中に hbase シェルを介して行をクエリすると、列は確実に削除されています。このタスクの後続の実行でスキャンが同じ行を返すのはなぜですか?

前もって感謝します!

4

1 に答える 1

1

主要な圧縮とは関係ありません (デフォルトでは ~24 時間ごとに実行されます)。行を削除すると、削除されたデータは、最終的に (major_compactions で) 削除されるまで HBase によって無視されます。autoflush を有効にしていない場合は、htable.flushCommits()(デフォルトでは autoflush=on) を呼び出して、最初にクライアント バッファを手動でフラッシュする必要があることに注意してください。

値が存在しない場合のデフォルトの動作であるため、削除するだけa:dateで行に読み取り中の列があり、フィルターを通過しているため、問題が発生した可能性があります。


行全体を削除する場合は、列だけでなく行を削除するだけdelete.deleteColumn(Bytes.toBytes('a'), Bytes.toBytes('date'));です。


a:date行の残りの部分はそのままにして列を削除するだけの場合は、filterIfMissing フラグを設定して、行がa:date == null通過しないようにします (削除されているため)。filter.setFilterIfMissing(true);

または、最高のパフォーマンスを得るには、その列だけをスキャンに追加します。これにより、他の列が読み取られなくなります。scan.addColumn(Bytes.toBytes('a'), Bytes.toBytes('date'));


list.addFilter(new PageFilter(maxResultSize));余談ですが、テーブルの各領域から maxResultSize の結果を取得することに注意してください。

もう 1 つのヒントとして、デバッグ目的でログを記録する場合は、その内容を正確に確認するために常に完全な結果をログに記録してください。

于 2014-12-31T06:49:15.760 に答える