Google Cloud Functionから Cloud Dataflow ジョブを起動するにはどうすればよいですか? クロスサービス合成を可能にする仕組みとして、Google Cloud Functions を利用したいと考えています。
2181 次
2 に答える
9
以下に、 WordCountサンプルの非常に基本的な例を示します。Cloud Function デプロイメントには Java バイナリのコピーを含める必要があることに注意してください。これはデフォルト環境にないためです。同様に、デプロイ jar も Cloud Function と一緒にパッケージ化する必要があります。
module.exports = {
wordcount: function (context, data) {
const spawn = require('child_process').spawn;
const child = spawn(
'jre1.8.0_73/bin/java',
['-cp',
'MY_JAR.jar',
'com.google.cloud.dataflow.examples.WordCount',
'--jobName=fromACloudFunction',
'--project=MY_PROJECT',
'--runner=BlockingDataflowPipelineRunner',
'--stagingLocation=gs://STAGING_LOCATION',
'--inputFile=gs://dataflow-samples/shakespeare/*',
'--output=gs://OUTPUT_LOCATION'
],
{ cwd: __dirname });
child.stdout.on('data', function(data) {
console.log('stdout: ' + data);
});
child.stderr.on('data', function(data) {
console.log('error: ' + data);
});
child.on('close', function(code) {
console.log('closing code: ' + code);
});
context.success();
}
}
ノンブロッキング ランナーを使用し、関数がジョブ ID を返すようにすることで、この例をさらに拡張して、ジョブの完了を個別にポーリングできるようにすることができます。このパターンは、依存関係を Cloud Function にパッケージ化できる限り、他の SDK でも有効である必要があります。
于 2016-02-15T17:51:04.867 に答える
1
クラウド機能を使用して起動するのが最善の方法ですが、Google クラウド ストレージにクラウド機能を使用している場合は、ファイルがアップロードされるたびにデータフロー ジョブが起動されることに注意してください。
const { google } = require('googleapis');
const templatePath = "gs://template_dir/df_template;
const project = "<project_id>";
const tempLoc = "gs://tempLocation/";
exports.PMKafka = (data, context, callback) => {
const file = data;
console.log(`Event ${context.eventId}`);
console.log(`Event Type: ${context.eventType}`);
console.log(`Bucket Name: ${file.bucket}`);
console.log(`File Name: ${file.name}`);
console.log(`Metageneration: ${file.metageneration}`);
console.log(`Created: ${file.timeCreated}`);
console.log(`Updated: ${file.updated}`);
console.log(`Uploaded File Name - gs://${file.bucket}/${file.name}`);
google.auth.getApplicationDefault(function (err, authClient, projectId) {
if (err) {
throw err;
}
if (authClient.createScopedRequired && authClient.createScopedRequired()) {
authClient = authClient.createScoped(authScope);
}
const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
var inputDict= {
inputFile: `gs://${file.bucket}/${file.name}`,
...
...
<other_runtime_parameters>
};
var env = {
tempLocation: tempLoc
};
var resource_opts = {
parameters: inputDict,
environment: env,
jobName: config.jobNamePrefix + "-" + new Date().toISOString().toLowerCase().replace(":","-").replace(".","-")
};
var opts = {
gcsPath: templatePath,
projectId: project,
resource: resource_opts
}
console.log(`Dataflow Run Time Options - ${JSON.stringify(opts)}`)
dataflow.projects.templates.launch(opts, function (err, response) {
if (err) {
console.error("problem running dataflow template, error was: ", err);
slack.publishMessage(null, null, false, err);
return;
}
console.log("Dataflow template response: ", response);
var jobid = response["data"]["job"]["id"];
console.log("Dataflow Job ID: ", jobid);
});
callback();
});
};
于 2020-05-23T23:04:36.437 に答える