0

SDK のバージョン 1.9.36 を使用して、Google App Engine で Java アプリケーションを実行しています。アプリケーションは、Datastore、BigQuery、Cloud Storage にアクセスできる Java 7 を実行しています。コンテナーは B8 クラスのバックエンド サーバーであり、いくつかの基本的なチェックを行ってから TaskQueue エントリを送信する受信サーブレットによってタスクが送信されます。

私が直面している問題は、アプリケーションが単に応答を停止することです。データは、JobQueue を使用して BigQuery テーブルから読み取られます。ジョブのステータスが RUNNING から DONE になることを確認している間、アプリケーションは単にログと処理を停止します。この待機反復回数は、アプリケーションが停止する場所によって異なります。

一貫性がないため、特定のコードに固定することはできません。

アプリケーションが何日も問題なく動作する日もあれば、停止せずに 1 回の反復を完了することさえできない日もあります。

テーブル内のデータは、600 ~ 6000 行の間で変化します。一度に 2000 行のチャンクを読み取ります。

「Job done - let process results」の部分に到達して、単に停止することがあります。いくつかの RUNNING メッセージをログに記録して停止することがあります。

これは、ジョブ パラメータを設定してからジョブを開始する部分です。do while ループのコードを省略しました。{} それはビットを処理しているだけですが、正常に動作します。どこかでデータの取得に問題があるようです。

public List<String> retrieveMergedTableTxIds(Date runDate) throws ProcessorCustomException {

    List<String> existingIds = new ArrayList<>();

    BigQueryQryStringBuilder queryStringBuilder = new BigQueryQryStringBuilderImpl();

    String tempTableName = queryStringBuilder.buildTempTableName();

    String qryString;

    try {
        qryString = queryStringBuilder.buildMergeTableQuery(ApplicationConstants.projectName, ApplicationConstants.DATASET_ID,
                ApplicationConstants.TRXID_VIEW, runDate);

        logger.info("Query string is |" + qryString);

        JobQueryParameters jobQueryParameters = new JobQueryParameters();
        jobQueryParameters.setBigquery(bigquery);
        jobQueryParameters.setProjectId(ApplicationConstants.projectName);
        jobQueryParameters.setDatasetId(ApplicationConstants.DATASET_ID);
        jobQueryParameters.setQuerySql(qryString);
        jobQueryParameters.setTempTableName(tempTableName);


        JobReference jobId = startQuery(jobQueryParameters);

        logger.fine("JobID for submitted job is " + jobId.getJobId());
        logger.fine("Polling job for DONE status");

        TableReference completedJob = pollJobStatus(jobQueryParameters.getBigquery(), ApplicationConstants.projectName, jobId);

        logger.fine("Job done - let process results!");

        Job job = jobQueryParameters.getBigquery().jobs().get(ApplicationConstants.projectName, jobId.getJobId()).execute();

        logger.fine("JobID is " + job.getId());

        logger.fine("JobID is " + job.getId());

        GetQueryResultsResponse response = jobQueryParameters.getBigquery().jobs()
                .getQueryResults(ApplicationConstants.projectName, job.getJobReference().getJobId()).execute();

        logger.fine("Response total rows is " + response.getTotalRows());

        // Default to not looping
        boolean moreResults = false;
        String pageToken = null;

        do {
            logger.fine("Insize the per-token do-while loop");

            TableDataList queryResult = jobQueryParameters.getBigquery().tabledata()
                    .list(completedJob.getProjectId(), completedJob.getDatasetId(), completedJob.getTableId())
                    .setMaxResults(ApplicationConstants.MAX_RESULTS).setPageToken(pageToken).execute();

            logger.info("Value for isEmpty is " + queryResult.isEmpty());

            if (!queryResult.isEmpty() && queryResult != null) {
                logger.fine("Row size for token is " + queryResult.size());
            }


        } while (moreResults);
public TableReference pollJobStatus(Bigquery bigquery, String projectId, JobReference jobId) throws IOException,
        InterruptedException {

    while (true) {
        Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId()).execute();

        logger.info("Job status for JobId " + pollJob.getId() + " is " + pollJob.getStatus().getState());

        if (pollJob.getStatus().getState().equals("DONE")) {
            logger.info("Returning the TableReference in pollJobStatus");
            return pollJob.getConfiguration().getQuery().getDestinationTable();
        }
        // Pause execution for one second before polling job status again,
        // to
        // reduce unnecessary calls to the BigQUery API and lower overall
        // application bandwidth.
        Thread.sleep(2000);
    }
}

public JobReference startQuery(JobQueryParameters jobQueryParameters) throws IOException {

    Job job = new Job();
    JobConfiguration config = new JobConfiguration();
    JobConfigurationQuery queryConfig = new JobConfigurationQuery();

    queryConfig.setAllowLargeResults(true);

    TableReference reference = new TableReference();
    reference.setProjectId(jobQueryParameters.getProjectId());
    reference.setDatasetId(jobQueryParameters.getDatasetId());
    reference.setTableId(jobQueryParameters.getTempTableName());

    Table table = new Table();
    table.setId(jobQueryParameters.getTempTableName());
    table.setExpirationTime(Calendar.getInstance().getTimeInMillis() + 360000L);
    table.setTableReference(reference);

    jobQueryParameters.getBigquery().tables()
            .insert(jobQueryParameters.getProjectId(), jobQueryParameters.getDatasetId(), table).execute();

    queryConfig.setDestinationTable(reference);

    config.setQuery(queryConfig);

    job.setConfiguration(config);

    queryConfig.setQuery(jobQueryParameters.getQuerySql());

    Insert insert = jobQueryParameters.getBigquery().jobs().insert(jobQueryParameters.getProjectId(), job);
    insert.setProjectId(jobQueryParameters.getProjectId());

    JobReference jobId = insert.execute().getJobReference();

    return jobId;
}

AppEngine コンソールを見ると、インスタンスはまだ実行中として表示されますが、処理されたリクエストを示すグラフも停止します。

コードの変更や再デプロイなしで動作が非常に不安定になる、同様の経験をした人はいますか?

4

0 に答える 0