33

SQSに複数のメッセージがあります。次のコードは、数十が表示されている場合でも(飛行中ではない)、 常に1つだけを返します。setMaxNumberOfMessages一度に複数を消費できると思いました..これを誤解しましたか?

 CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
 String queueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();
 ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl);
 receiveMessageRequest.setMaxNumberOfMessages(10);
 List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
 for (Message message : messages) {
      // i'm a message from SQS
 }

私はまた、そのような運なしでwithMaxNumberOfMessagesを使用してみました:

 receiveMessageRequest.withMaxNumberOfMessages(10);

キューにメッセージがあることをどうやって知ることができますか?1つ以上?

 Set<String> attrs = new HashSet<String>();
 attrs.add("ApproximateNumberOfMessages");
 CreateQueueRequest createQueueRequest = new CreateQueueRequest().withQueueName(queueName);
 GetQueueAttributesRequest a = new GetQueueAttributesRequest().withQueueUrl(sqs.createQueue(createQueueRequest).getQueueUrl()).withAttributeNames(attrs);
 Map<String,String> result = sqs.getQueueAttributes(a).getAttributes();
 int num = Integer.parseInt(result.get("ApproximateNumberOfMessages"));

上記は常に前に実行され、int>1である

ご入力いただきありがとうございます

4

8 に答える 8

40

AWS APIリファレンスガイド:Query / QueryReceiveMessage

キューの分散性により、ReceiveMessage呼び出しで、重み付けされたランダムなマシンのセットがサンプリングされます。つまり、サンプリングされたマシン上のメッセージのみが返されます。キュー内のメッセージの数が少ない場合(1000未満)、ReceiveMessage呼び出しごとに要求したよりも少ないメッセージを受け取る可能性があります。キュー内のメッセージの数が非常に少ない場合、特定のReceiveMessage応答でメッセージを受信しない可能性があります。その場合、リクエストを繰り返す必要があります。

MaxNumberOfMessages:返されるメッセージの最大数。SQSが返すメッセージがこの値より多くなることはありませんが、返されるメッセージが少なくなる可能性があります。

于 2012-03-31T21:34:47.590 に答える
10

SQSリファレンスドキュメントには、この(おそらくかなり特異な)動作の包括的な説明があります。

SQSはメッセージのコピーを複数のサーバーに保存し、メッセージの受信要求は2つの可能な戦略のいずれかを使用してこれらのサーバーに対して行われます。

  • ショートポーリング:デフォルトの動作であり、サーバーのサブセット(加重ランダム分布に基づく)のみが照会されます。
  • ロングポーリング:WaitTimeSeconds属性をゼロ以外の値に設定することで有効になり、すべてのサーバーが照会されます。

実際には、私の限られたテストでは、あなたがしたように、私はいつも短いポーリングで1つのメッセージを受け取るようです。

于 2014-09-13T22:28:57.253 に答える
5

私も同じ問題を抱えていました。キューが設定されているメッセージ受信待機時間はどれくらいですか?私が0のとき、キューに8つあったとしても、1つのメッセージしか返しませんでした。受信メッセージの待機時間を増やすと、すべてが取得されました。私にはちょっとバグがあるようです。

于 2013-08-31T00:21:18.390 に答える
5

私はちょうど同じことを試みていて、これらの2つの属性setMaxNumberOfMessagesとsetWaitTimeSecondsの助けを借りて、10個のメッセージを取得することができました。

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);
                      receiveMessageRequest.setMaxNumberOfMessages(10);
                      receiveMessageRequest.setWaitTimeSeconds(20);

o / pのスナップショット:

Receiving messages from TestQueue.
Number of messages:10
Message
MessageId:     31a7c669-1f0c-4bf1-b18b-c7fa31f4e82d 
...
于 2017-07-26T09:40:55.760 に答える
1

receiveMessageRequest.withMaxNumberOfMessages(10);

明確にするために、これのより実用的な使用法は、次のようにコンストラクターに追加することです。

ReceiveMessageRequest receiveMessageRequest = new ReceiveMessageRequest(queueUrl).withMaxNumberOfMessages(10);

それ以外の場合は、次のようにすることもできます。

receiveMessageRequest.setMaxNumberOfMessages(10);

そうは言っても、これを変更しても元の問題は解決しません。

于 2012-06-15T17:55:36.437 に答える
1

Caoilteに感謝します!

私もこの問題に直面しました。ロングポーリングを使用して最終的に解決するには、次の設定に従います: https ://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-configure-long-polling-for-queue.html

残念ながら、長いポーリングを使用するには、キューをFIFO1として作成する必要があります。運が悪かったので標準キューを試しました。

また、受信時にMaxNumberOfMessagesも設定する必要があります。したがって、私のコードは次のようになります。

ReceiveMessageRequest receive_request = new ReceiveMessageRequest().withQueueUrl(QUEUE_URL).withWaitTimeSeconds(20).withMaxNumberOfMessages(10);

解決されましたが、それでも配線が多すぎると感じます。AWSは、この種の基本的な受信操作のために、よりきちんとしたAPIを確実に提供する必要があります。

私の見解では、AWSには多くの優れた機能がありますが、優れたAPIはありません。それらの人のようにいつも急いでいます。

于 2019-03-13T18:54:22.913 に答える
1

小さなタスクリストの場合、 stackoverflow.com / a / 55149351/13678017のようなFIFOキューを使用します 。たとえば、変更されたAWSチュートリアル

            // Create a queue.
        System.out.println("Creating a new Amazon SQS FIFO queue called " + "MyFifoQueue.fifo.\n");
        final Map<String, String> attributes = new HashMap<>();

        // A FIFO queue must have the FifoQueue attribute set to true.
        attributes.put("FifoQueue", "true");
        /*
         * If the user doesn't provide a MessageDeduplicationId, generate a
         * MessageDeduplicationId based on the content.
         */
        attributes.put("ContentBasedDeduplication", "true");
        // The FIFO queue name must end with the .fifo suffix.
        final CreateQueueRequest createQueueRequest = new CreateQueueRequest("MyFifoQueue4.fifo")
                        .withAttributes(attributes);
        final String myQueueUrl = sqs.createQueue(createQueueRequest).getQueueUrl();

        // List all queues.
        System.out.println("Listing all queues in your account.\n");
        for (final String queueUrl : sqs.listQueues().getQueueUrls()) {
            System.out.println("  QueueUrl: " + queueUrl);
        }
        System.out.println();

        // Send a message.
        System.out.println("Sending a message to MyQueue.\n");

        for (int i = 0; i < 4; i++) {

            var request = new SendMessageRequest()
                    .withQueueUrl(myQueueUrl)
                    .withMessageBody("message " + i)
                    .withMessageGroupId("userId1");
                    ;

            sqs.sendMessage(request);
        }

        for (int i = 0; i < 6; i++) {

            var request = new SendMessageRequest()
                    .withQueueUrl(myQueueUrl)
                    .withMessageBody("message " + i)
                    .withMessageGroupId("userId2");
                    ;

            sqs.sendMessage(request);
        }

        // Receive messages.
        System.out.println("Receiving messages from MyQueue.\n");
        var receiveMessageRequest = new ReceiveMessageRequest(myQueueUrl);

        receiveMessageRequest.setMaxNumberOfMessages(10);
        receiveMessageRequest.setWaitTimeSeconds(20);

        // what receive?
        receiveMessageRequest.withMessageAttributeNames("userId2");


        final List<Message> messages = sqs.receiveMessage(receiveMessageRequest).getMessages();
        for (final Message message : messages) {
            System.out.println("Message");
            System.out.println("  MessageId:     "
                    + message.getMessageId());
            System.out.println("  ReceiptHandle: "
                    + message.getReceiptHandle());
            System.out.println("  MD5OfBody:     "
                    + message.getMD5OfBody());
            System.out.println("  Body:          "
                    + message.getBody());
            for (final Entry<String, String> entry : message.getAttributes()
                    .entrySet()) {
                System.out.println("Attribute");
                System.out.println("  Name:  " + entry
                        .getKey());
                System.out.println("  Value: " + entry
                        .getValue());
            }
        }
于 2020-06-04T07:27:30.033 に答える
0

回避策は次のとおりです。receiveMessageFromSQSメソッドを非同期で呼び出すことができます。

   bulkReceiveFromSQS (queueUrl, totalMessages, asyncLimit, batchSize, visibilityTimeout, waitTime, callback) {
    batchSize = Math.min(batchSize, 10);

    let self = this,
        noOfIterations = Math.ceil(totalMessages / batchSize);

    async.timesLimit(noOfIterations, asyncLimit, function(n, next) {
        self.receiveMessageFromSQS(queueUrl, batchSize, visibilityTimeout, waitTime,
            function(err, result) {
                if (err) {
                    return next(err);
                }

                return next(null, _.get(result, 'Messages'));
            });
    }, function (err, listOfMessages) {
        if (err) {
            return callback(err);
        }
        listOfMessages = _.flatten(listOfMessages).filter(Boolean);

        return callback(null, listOfMessages);
    });
}

指定された数のメッセージを含む配列が返されます

于 2020-03-18T03:29:33.857 に答える