3

S3 put イベント (ログファイル) に応答し、ファイルを読み取り、COPY コマンドを介して Postgres テーブルに挿入するAWS Lambda実行スクリプトを node.js で作成しています。logToPostgresデータベースへの書き込み (以下のスクリプト)を除くすべてが期待どおりに機能しているようです。

いくつかのメモ:

  • ここでは関係ないので、S3 put イベントであることを確認するためにチェックする部分と、その他のエラー処理コードを削除しました。
  • データベース ユーザーはINSERT、テーブルとALLデータベースに対するアクセス許可を持ち、任意の IP からアクセスできます (すべて検証済み)。
  • secrets.jsデータベース資格情報をエクスポートする同じディレクトリ内のモジュールです
  • スクリプトをローカルで実行すると、データベースに問題なく書き込むことができます。
  • AWS Lambda の制限に達していません — S3 からダウンロードされたファイルは 521 バイトで、タイムアウトは最大 60 秒に設定されています (テストと同じ DB への書き込みでは 300 ミリ秒未満で実行されます)。

stream.pipe(query)...クラウド ウォッチにエラーはなく、すべてのステップでログを追加することで、コードの部分に絞り込むことができました。何らかの理由で、この部分は AWS Lambda によって実行されていませんが、ローカルでは正常に実行されています。'finished'andイベントを発行していない'end'ため、実行されないままになっていると思います。

問題がどこにあるのかについて何か考えはありますか?

var async = require('async');
var fs = require('fs');
var aws = require('aws-sdk');
var s3 = new aws.S3();
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');

exports.handler = function(event, context) {
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;

    async.waterfall(
        [
            function downloadWebhook(next) {
                s3.getObject({Bucket: bucket, Key: key}, next);
            },
            function saveToDisk(response, next) {
                var file = fs.createWriteStream('/tmp/foo_' + Date.now());
                file.write(response.Body);
                file.close();
                next(null, file.path);
            },
            function createStdinStream(path, next) {
                next(null, fs.createReadStream(path));
            },
            function logToPostgres(stream, next) {
                var client = new pg.Client('pg://' + secrets.user + ':' +
                    secrets.password + '@' + secrets.host + ':' +
                    secrets.port + '/' + secrets.database);
                client.connect(function (error) {
                    if (error) console.error(error);
                    var query = client.query(pgCopy('COPY my_table FROM STDIN'));
                    stream.pipe(query)
                        .on('finish', function () {
                            client.end();
                            next(null, null);
                        });
                });
            }
        ],
        function (error) {
            if (error) console.error(error);
            context.done();
        }
    });
};

アップデート:

書き込み可能なストリームがイベントを発行するようになったことが判明した'finish'ため、それを変更し'finish'て回答からの提案を含めると、エラーなしで実行されます。ただし、ラムダの実行後、データベースにはまだ行がありません。トランザクションがロールバックされているのではないかと思いますが、その理由や場所を特定できません。私は明示的にトランザクションを開始してコミットしようとしましたが、サイコロはありませんでした。

4

2 に答える 2

0

context.done()Lambda 関数の実行をすぐに停止します。したがって、 を呼び出すnext(null, null)とウォーターフォールが完了しますが、Postgres クエリは非同期であるため、完了まで実行されません。

代わりに試してください:

stream.pipe(query)
    .on('end', function() {
        client.end();
        next(null, null);
    })
    .on('error', context.fail);

ここで、ストリームが終了した後にのみウォーターフォールを解決する方法に注意してください。

于 2015-04-10T23:28:26.757 に答える
0

ソリューションの何が問題だったのかわかりません。これが私のテスト済みのよりコンパクトなソリューションです。これがうまくいくかどうか教えてください!

var aws = require('aws-sdk');
var s3 = new aws.S3();
var S3S = require('s3-streams');
var pg = require('pg');
var pgCopy = require('pg-copy-streams').from;
var secrets = require('./secrets.js');

exports.handler = function(event, context) {
    var bucket = event.Records[0].s3.bucket.name;
    var key = event.Records[0].s3.object.key;

    var stream = S3S.ReadStream(s3, {Bucket: bucket, Key: key});
    pg.connect(secrets.connector, function(err, client) {
        if (err) console.log(err);
        var query = client.query(pgCopy(
            "COPY event_log(user_id, event, ...) FROM STDIN CSV"
        ));
        stream.pipe(query)
            .on('end', function () {
                client.end();
                context.done();
            })
            .on('error', function(error) {
                console.log(error);
            });
    });
};
于 2015-07-23T18:56:06.243 に答える