Google Cloud Functionから Cloud Dataflow ジョブを起動するにはどうすればよいですか? クロスサービス合成を可能にする仕組みとして、Google Cloud Functions を利用したいと考えています。
			
			2181 次
		
2 に答える
            9        
        
		
以下に、 WordCountサンプルの非常に基本的な例を示します。Cloud Function デプロイメントには Java バイナリのコピーを含める必要があることに注意してください。これはデフォルト環境にないためです。同様に、デプロイ jar も Cloud Function と一緒にパッケージ化する必要があります。
module.exports = {
  wordcount: function (context, data) {
    const spawn = require('child_process').spawn;
    const child = spawn(
            'jre1.8.0_73/bin/java',
            ['-cp',
             'MY_JAR.jar',
             'com.google.cloud.dataflow.examples.WordCount',
             '--jobName=fromACloudFunction',
             '--project=MY_PROJECT',
             '--runner=BlockingDataflowPipelineRunner',
             '--stagingLocation=gs://STAGING_LOCATION',
             '--inputFile=gs://dataflow-samples/shakespeare/*',
             '--output=gs://OUTPUT_LOCATION'
            ],
            { cwd: __dirname });
    child.stdout.on('data', function(data) {
      console.log('stdout: ' + data);
    });
    child.stderr.on('data', function(data) {
      console.log('error: ' + data);
    });
    child.on('close', function(code) {
      console.log('closing code: ' + code);
    });
    context.success();
  }
}
ノンブロッキング ランナーを使用し、関数がジョブ ID を返すようにすることで、この例をさらに拡張して、ジョブの完了を個別にポーリングできるようにすることができます。このパターンは、依存関係を Cloud Function にパッケージ化できる限り、他の SDK でも有効である必要があります。
于 2016-02-15T17:51:04.867   に答える
    
    
            1        
        
		
クラウド機能を使用して起動するのが最善の方法ですが、Google クラウド ストレージにクラウド機能を使用している場合は、ファイルがアップロードされるたびにデータフロー ジョブが起動されることに注意してください。
const { google } = require('googleapis');
const templatePath = "gs://template_dir/df_template;
const project = "<project_id>";
const tempLoc = "gs://tempLocation/";
exports.PMKafka = (data, context, callback) => {
    const file = data;
    console.log(`Event ${context.eventId}`);
    console.log(`Event Type: ${context.eventType}`);
    console.log(`Bucket Name: ${file.bucket}`);
    console.log(`File Name: ${file.name}`);
    console.log(`Metageneration: ${file.metageneration}`);
    console.log(`Created: ${file.timeCreated}`);
    console.log(`Updated: ${file.updated}`);
    console.log(`Uploaded File Name - gs://${file.bucket}/${file.name}`);
    google.auth.getApplicationDefault(function (err, authClient, projectId) {
        if (err) {
            throw err;
        }
        if (authClient.createScopedRequired && authClient.createScopedRequired()) {
            authClient = authClient.createScoped(authScope);
        }
        const dataflow = google.dataflow({ version: 'v1b3', auth: authClient });
        var inputDict= {
            inputFile: `gs://${file.bucket}/${file.name}`,
            ...
            ...
            <other_runtime_parameters>
        };
        var env = {
            tempLocation: tempLoc
        };
        var resource_opts = {
            parameters: inputDict,
            environment: env,
            jobName: config.jobNamePrefix + "-" + new Date().toISOString().toLowerCase().replace(":","-").replace(".","-")
        };
        var opts = {
            gcsPath: templatePath,
            projectId: project,
            resource: resource_opts
        }
        console.log(`Dataflow Run Time Options - ${JSON.stringify(opts)}`)
        dataflow.projects.templates.launch(opts, function (err, response) {
            if (err) {
                console.error("problem running dataflow template, error was: ", err);
                slack.publishMessage(null, null, false, err);
                return;
            }
            console.log("Dataflow template response: ", response);
            var jobid = response["data"]["job"]["id"];
            console.log("Dataflow Job ID: ", jobid);
        });
        callback();
    });
};
    于 2020-05-23T23:04:36.437   に答える