SDK のバージョン 1.9.36 を使用して、Google App Engine で Java アプリケーションを実行しています。アプリケーションは、Datastore、BigQuery、Cloud Storage にアクセスできる Java 7 を実行しています。コンテナーは B8 クラスのバックエンド サーバーであり、いくつかの基本的なチェックを行ってから TaskQueue エントリを送信する受信サーブレットによってタスクが送信されます。
私が直面している問題は、アプリケーションが単に応答を停止することです。データは、JobQueue を使用して BigQuery テーブルから読み取られます。ジョブのステータスが RUNNING から DONE になることを確認している間、アプリケーションは単にログと処理を停止します。この待機反復回数は、アプリケーションが停止する場所によって異なります。
一貫性がないため、特定のコードに固定することはできません。
アプリケーションが何日も問題なく動作する日もあれば、停止せずに 1 回の反復を完了することさえできない日もあります。
テーブル内のデータは、600 ~ 6000 行の間で変化します。一度に 2000 行のチャンクを読み取ります。
「Job done - let process results」の部分に到達して、単に停止することがあります。いくつかの RUNNING メッセージをログに記録して停止することがあります。
これは、ジョブ パラメータを設定してからジョブを開始する部分です。do while ループのコードを省略しました。{} それはビットを処理しているだけですが、正常に動作します。どこかでデータの取得に問題があるようです。
public List<String> retrieveMergedTableTxIds(Date runDate) throws ProcessorCustomException {
List<String> existingIds = new ArrayList<>();
BigQueryQryStringBuilder queryStringBuilder = new BigQueryQryStringBuilderImpl();
String tempTableName = queryStringBuilder.buildTempTableName();
String qryString;
try {
qryString = queryStringBuilder.buildMergeTableQuery(ApplicationConstants.projectName, ApplicationConstants.DATASET_ID,
ApplicationConstants.TRXID_VIEW, runDate);
logger.info("Query string is |" + qryString);
JobQueryParameters jobQueryParameters = new JobQueryParameters();
jobQueryParameters.setBigquery(bigquery);
jobQueryParameters.setProjectId(ApplicationConstants.projectName);
jobQueryParameters.setDatasetId(ApplicationConstants.DATASET_ID);
jobQueryParameters.setQuerySql(qryString);
jobQueryParameters.setTempTableName(tempTableName);
JobReference jobId = startQuery(jobQueryParameters);
logger.fine("JobID for submitted job is " + jobId.getJobId());
logger.fine("Polling job for DONE status");
TableReference completedJob = pollJobStatus(jobQueryParameters.getBigquery(), ApplicationConstants.projectName, jobId);
logger.fine("Job done - let process results!");
Job job = jobQueryParameters.getBigquery().jobs().get(ApplicationConstants.projectName, jobId.getJobId()).execute();
logger.fine("JobID is " + job.getId());
logger.fine("JobID is " + job.getId());
GetQueryResultsResponse response = jobQueryParameters.getBigquery().jobs()
.getQueryResults(ApplicationConstants.projectName, job.getJobReference().getJobId()).execute();
logger.fine("Response total rows is " + response.getTotalRows());
// Default to not looping
boolean moreResults = false;
String pageToken = null;
do {
logger.fine("Insize the per-token do-while loop");
TableDataList queryResult = jobQueryParameters.getBigquery().tabledata()
.list(completedJob.getProjectId(), completedJob.getDatasetId(), completedJob.getTableId())
.setMaxResults(ApplicationConstants.MAX_RESULTS).setPageToken(pageToken).execute();
logger.info("Value for isEmpty is " + queryResult.isEmpty());
if (!queryResult.isEmpty() && queryResult != null) {
logger.fine("Row size for token is " + queryResult.size());
}
} while (moreResults);
public TableReference pollJobStatus(Bigquery bigquery, String projectId, JobReference jobId) throws IOException,
InterruptedException {
while (true) {
Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId()).execute();
logger.info("Job status for JobId " + pollJob.getId() + " is " + pollJob.getStatus().getState());
if (pollJob.getStatus().getState().equals("DONE")) {
logger.info("Returning the TableReference in pollJobStatus");
return pollJob.getConfiguration().getQuery().getDestinationTable();
}
// Pause execution for one second before polling job status again,
// to
// reduce unnecessary calls to the BigQUery API and lower overall
// application bandwidth.
Thread.sleep(2000);
}
}
public JobReference startQuery(JobQueryParameters jobQueryParameters) throws IOException {
Job job = new Job();
JobConfiguration config = new JobConfiguration();
JobConfigurationQuery queryConfig = new JobConfigurationQuery();
queryConfig.setAllowLargeResults(true);
TableReference reference = new TableReference();
reference.setProjectId(jobQueryParameters.getProjectId());
reference.setDatasetId(jobQueryParameters.getDatasetId());
reference.setTableId(jobQueryParameters.getTempTableName());
Table table = new Table();
table.setId(jobQueryParameters.getTempTableName());
table.setExpirationTime(Calendar.getInstance().getTimeInMillis() + 360000L);
table.setTableReference(reference);
jobQueryParameters.getBigquery().tables()
.insert(jobQueryParameters.getProjectId(), jobQueryParameters.getDatasetId(), table).execute();
queryConfig.setDestinationTable(reference);
config.setQuery(queryConfig);
job.setConfiguration(config);
queryConfig.setQuery(jobQueryParameters.getQuerySql());
Insert insert = jobQueryParameters.getBigquery().jobs().insert(jobQueryParameters.getProjectId(), job);
insert.setProjectId(jobQueryParameters.getProjectId());
JobReference jobId = insert.execute().getJobReference();
return jobId;
}
AppEngine コンソールを見ると、インスタンスはまだ実行中として表示されますが、処理されたリクエストを示すグラフも停止します。
コードの変更や再デプロイなしで動作が非常に不安定になる、同様の経験をした人はいますか?