3

次を使用して Python API を介して BigQuery にクエリを実行する場合:

service.jobs().getQueryResults

最初の試行は正常に機能することがわかっています。期待されるすべての結果が応答に含まれています。ただし、最初のクエリの直後 (およそ 5 分以内) に 2 回目のクエリを実行すると、結果の小さなサブセットのみが (2 の累乗で) ほぼ瞬時に返され、エラーは発生しません。

https://github.com/sean-schaefer/pandas/blob/master/pandas/io/gbq.pyで完全なコードを参照して ください。

これを引き起こす原因について何か考えはありますか?

4

1 に答える 1

1

問題は、query() と getQueryResults() で異なるデフォルトの行数を返すことにあるようです。したがって、クエリがすぐに終了したかどうかに応じて (したがって getQueryResults() を使用する必要はありませんでした)、取得する行数が増減します。

バグを報告したので、すぐに修正する必要があります。

回避策 (および全体的に良い考え) は、クエリと getQueryResults 呼び出しの両方に maxResults を設定することです。また、多数の行が必要な場合は、返されたページ トークンを使用して結果をページングすることができます。

以下は、完了したクエリ ジョブから 1 ページのデータを読み取る例です。これは、bq.py の次のリリースに含まれます。

class _JobTableReader(_TableReader):
  """A TableReader that reads from a completed job."""

  def __init__(self, local_apiclient, project_id, job_id):
    self.job_id = job_id
    self.project_id = project_id
    self._apiclient = local_apiclient

  def ReadSchemaAndRows(self, max_rows=None):
    """Read at most max_rows rows from a table and the schema.

    Args:
      max_rows: maximum number of rows to return.

    Raises:
      BigqueryInterfaceError: when bigquery returns something unexpected.

    Returns:
      A tuple where the first item is the list of fields and the
      second item a list of rows.
    """
    page_token = None
    rows = []
    schema = {}
    max_rows = max_rows if max_rows is not None else sys.maxint
    while len(rows) < max_rows:
      (more_rows, page_token, total_rows, current_schema) = self._ReadOnePage(
          max_rows=max_rows - len(rows),
          page_token=page_token)
      if not schema and current_schema:
        schema = current_schema.get('fields', {})

      max_rows = min(max_rows, total_rows)
      for row in more_rows:
        rows.append([entry.get('v', '') for entry in row.get('f', [])])
      if not page_token and len(rows) != max_rows:
          raise BigqueryInterfaceError(
            'PageToken missing for %r' % (self,))
      if not more_rows and len(rows) != max_rows:
        raise BigqueryInterfaceError(
            'Not enough rows returned by server for %r' % (self,))
    return (schema, rows)

  def _ReadOnePage(self, max_rows, page_token=None):
    data = self._apiclient.jobs().getQueryResults(
        maxResults=max_rows,
        pageToken=page_token,
        # Sets the timeout to 0 because we assume the table is already ready.
        timeoutMs=0,
        projectId=self.project_id,
        jobId=self.job_id).execute()
    if not data['jobComplete']:
      raise BigqueryError('Job %s is not done' % (self,))
    page_token = data.get('pageToken', None)
    total_rows = int(data['totalRows'])
    schema = data.get('schema', None)
    rows = data.get('rows', [])
    return (rows, page_token, total_rows, schema)
于 2013-07-19T21:02:06.967 に答える