3

声明

10 台のマシン HBase クラスターと数十億の行があります。すべての行は、1 つの列ファミリーと最大 20 列で構成されます。開始行プレフィックスと終了行プレフィックスを含む頻繁なスキャン リクエストを実行する必要があります。通常、すべてのスキャンで約 100 ~ 10000 行が返されます。

リクエストは非常に頻繁に (1 分間に最大数リクエスト) 送信される可能性があるため、パフォーマンスが優先されます。システムのアーキテクチャのため、現在の Java コードではなく Python でソリューションを実現したいと考えています。問題は、Python では Java よりも 5 倍から 10 倍悪いパフォーマンスが得られることです。

現在機能しているもの

HBase へのスキャン要求を実行する Java コードがあります。通常の HBase Java API を使用します。

public List<String> getNumber(Number key) {
    List<String> res = new ArrayList<>();

    String start_key = key.getNumber();
    String next_key = key.getNumber() + "1";
    byte[] prefix_begin = Bytes.toBytes(start_key);
    byte[] prefix_end = Bytes.toBytes(next_key);
    Scan scan = new Scan(prefix_begin, prefix_end);
    ResultScanner scanner = table.getScanner(scan);
    for (Result result : scanner) {
        byte[] row = result.getRow();
        res.add(Bytes.toString(row));
    }
    
    return res;
}

Callableこれらのクエリは、インターフェイスとを使用して並列化されましたScheduledThreadPoolExecutorcall()すべての callableのメソッドは rungetNumber(Number key)です。

public List<String> getNumbers(List<Number> keys) {
    List<String> res = new ArrayList<String>();

    List<Callables.CallingCallable> callables = new ArrayList();
    for (Number source : keys) {
        callables.add(new Callables.CallingCallable(this, source));
    }

    Object futures = new ArrayList();
    ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(24);

    try {
        futures = executor.invokeAll(callables);
    } catch (InterruptedException ex) {
    }

    executor.shutdown();
}

これはかなりうまく機能し、次のパフォーマンスを達成できます。

  • 1 回のスキャンあたり1.5 ~ 2.0 秒
  • 100 並列スキャンあたり5.0 ~ 8.0 秒

私たちが試みること

Happybaseライブラリの助けを借りて、Python で同様のソリューションを実装しようとしています。

@staticmethod
def execute_query(key, table_name, con_pool):
        items = []
        with con_pool.connection() as connection:
            table = happybase.Table(table_name, connection)
            [row_start, row_end] = get_start_and_end_row(key)
            selected_rows = table.scan(row_start=row_start, row_stop=row_end)
            for key, data in selected_rows:
                items.append(Item(data))
        return items

@staticmethod
def execute_in_parallel(table_name, hbase_host, hbase_port, keys):
        pool = ThreadPool(24)
        con_pool = happybase.ConnectionPool(size=24, host=hbase_host, port=hbase_port)
        execute_query_partial = partial(execute_query, table_name=table_name, con_pool=con_pool)
        result_info = pool.map_async(execute_query_partial, keys, chunksize=1)
        result = result_info.get()

達成された性能:

  • 1 回のスキャンあたり2.0 ~ 3.0 秒
  • 100 並列スキャンあたり30 ~ 55 秒

ご覧のとおり、シングル スキャンのパフォーマンスは非常に似ています。しかし、Python で並列化されたタスクははるかに遅くなります。

なぜそれが起こるのですか?Python/Happybase コードに問題があるのではないでしょうか? または、HBase Thrift サーバー (HappyBase が HBase への接続に使用するサーバー) のパフォーマンスは?

4

1 に答える 1