0

Arrow レコード バッチのストリームとして結果セットがあり、reader.read_chunk() を使用してバッチを取得しました。バッチをバッチ配列にプッシュし、応答として bytearray に変換しました。そのためのコードは次のとおりです。

def getBatchStreambytes(_):
   reader = client.do_get(flight_info.endpoints[0].ticket, options)
            print('[INFO] Reading query results from Dremio Server ')
            batches = [] 
            while True:
                     try:
                            batch, metadata = reader.read_chunk()
                            print(batch.num_rows)
                            batches.append(batch)

                except Exception as exception:
                        break

        data = pa.Table.from_batches(batches)
        sink = pa.BufferOutputStream()
        writer = pa.RecordBatchStreamWriter(sink, data.schema)
        writer.write_table(data)
        writer.close()
        #print(reader.read_pandas())

        return  sink.getvalue().to_pybytes()

バッチごとにより多くの時間がかかるAPI応答として、APIが反復バッチで応答するようにバッチを応答として送信する方法、バッチのチャンクを送信する方法。ここに、フライトサーバーから受信したバッチのリストのスクリーンショットがあります ここに画像の説明を入力

4

0 に答える 0