12

I am trying to write a nodejs sqs queue processor.

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) {
      if (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined') {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              throw error;
            }
            console.log('stdout: ' + stdout);
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              });
            }
          });
      }
    }
  });
}
readMessage();

The above code works fine for single message in queue. How should I write this script so that it keeps polling for messages in queue untill all messages are processed? Should i use set timeout?

4

2 に答える 2

16

まず第一に、 Amazon が提供するロング ポーリング手法を明確に使用する必要があります。私が理解しているように、呼び出しに"WaitTimeSeconds": 20引数があるため、既に使用しています。AWS Web インターフェイスsqs.receiveMessageでの設定を忘れていないことを願っています。

メッセージのポーリングについて-タイマーを含むさまざまな手法を使用できますが、最も単純なのは、の(またはの)コールバック関数のreadMessage()最後で関数を呼び出すだけだと思います。そのため、キュー内の次のメッセージの処理 (または待機中) は、キュー内の前のメッセージの処理が終了した直後に開始されます。receiveMessageexec

アップデート:

あなたの新しいバージョンのコードでは、多くのreadMessage()呼び出しがあります。コードをより明確にし、維持しやすくするために、それを最小限に抑える方が良いと思います。しかし、たとえば、メインのreceiveMessageコールバックの最後の 1 つの呼び出しだけを残すと、並行して実行されている多数の PHP ワーカー スクリプトを受け取ることになります。パフォーマンスの観点からはそれほど悪くはないかもしれませんが、並列ワーカーの量を制御するために複雑なスクリプトを追加する必要があります。execコールバックでいくつかの呼び出しを切断し、 ifs に参加して、メインのコールバックで呼び出しに参加できると思います。

"use strict";
var appConf = require('./config/appConf');
var AWS = require('aws-sdk');
AWS.config.loadFromPath('./config/aws_config.json');
var delay = 20 * 1000;
var sqs = new AWS.SQS();
var exec = require('child_process').exec;
function readMessage() {
  sqs.receiveMessage({
    "QueueUrl": appConf.sqs_distribution_url,
    "MaxNumberOfMessages": 1,
    "VisibilityTimeout": 30,
    "WaitTimeSeconds": 20
  }, function (err, data) {
    var sqs_message_body;
    if (data.Messages) 
      && (typeof data.Messages[0] !== 'undefined' && typeof data.Messages[0].Body !== 'undefined')) {
        //sqs msg body
        sqs_message_body = JSON.parse(data.Messages[0].Body);
        //make call to nodejs handler in codeigniter
        exec('php '+ appConf.CI_FC_PATH +'/index.php nodejs_handler make_contentq_call "'+ sqs_message_body.contentq_cat_id+'" "'+sqs_message_body.cnhq_cat_id+'" "'+sqs_message_body.network_id+'"',
          function (error, stdout, stderr) {
            if (error) {
              // error handling 
            }
            if(stdout == 'Success'){
              //delete message from queue
              sqs.deleteMessage({
                "QueueUrl" : appConf.sqs_distribution_url,
                "ReceiptHandle" :data.Messages[0].ReceiptHandle
              }, function(err, data){                
              });
            }
            readMessage();                
          });
      }          
    }        
    readMessage();        
  });
}
readMessage();

メモリ リークについて: の次の呼び出しはコールバック関数で発生するため、心配する必要はないと思います。つまり、readMessage()再帰的ではなく、再帰的に呼び出された関数は、関数を呼び出した直後に親関数に値を返しますreceiveMessage()

于 2013-07-23T09:07:40.090 に答える