1

ユースケースとして AWS のサービスを学習しています。ドキュメントを調べた後、単純なフローを思いつきました。Streams API と KPL を使用して、データを Kinesis ストリームに取り込みたいと考えています。サンプルの putRecord メソッドを使用して、データをストリームに取り込みます。この JSON をストリームに取り込みます -

{"userid":1234,"username":"jDoe","firstname":"John","lastname":"Doe"}

データが取り込まれると、putRecordResult で次の応答を取得します -

Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318638512162631666140828401666}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318645765717549353915876638722}
Put Result :{ShardId: shardId-000000000000,SequenceNumber: 49563097246355834103398973318649392495008197803400757250}

次に、これらのデータを取得して DynamoDB テーブルにプッシュする Lambda 関数を作成します。ここに私のラムダ関数があります -

console.log('Loading function');
var AWS = require('aws-sdk');
var tableName = "sampleTable";
var doc = require('dynamodb-doc');
var db = new doc.DynamoDB();

exports.handler = (event, context, callback) => {
    //console.log('Received event:', JSON.stringify(event, null, 2));
    event.Records.forEach((record) => {
        // Kinesis data is base64 encoded so decode here
        const payload = new Buffer(record.kinesis.data, 'base64').toString('ascii');
        console.log('Decoded payload:', payload);
        var userid = event.userid;
        var username = event.username;
        var firstname = event.firstname;
        console.log(userid + "," + username +","+ firstname);

        var item = {
            "userid" : userid,
            "username" : username,
            "firstname" : firstname
        };

        var params = {
            TableName : tableName,
            Item : item
        };
        console.log(params);

        db.putItem(params, function(err, data){
            if(err) console.log(err);
            else console.log(data);
        });

    });
    callback(null, `Successfully processed ${event.Records.length} records.`);
};

どういうわけか、ラムダ関数の実行で console.logs を確認できません。ストリーム ページには、ストリームに putRecord があり、同様に取得されていることがわかりますが、どういうわけか、Lambdafunction ページにも DynamoDB テーブルにも何も表示されません。

Kinesis にデータを取り込むための Java コード用の IAM ポリシー、lambda-kinesis-execution-role である Lambda 関数用の別のポリシー、および DynamoDB がデータをテーブルに取り込むためのポリシーがあります。

それが正しい方法でどのように行われるかを示すチュートリアルはありますか? このプロセスで多くの点が欠けていると感じています。たとえば、データがストリームに入れられたときに Lambda によって処理されて Dynamo で終わるように、これらすべての IAM ポリシーをリンクして同期させる方法などです。

任意のポインタとヘルプは深く感謝しています.

4

2 に答える 2

1

上記のコードが使用しているコードの直接コピーである場合、参照してevent.useridいますが、使用する必要がありますpayload.userid。Kinesis レコードをペイロード変数にデコードしました。

于 2016-06-29T04:14:37.707 に答える