20

私は、botoを使用してSQSでメッセージを渡すことによってワークフローが管理されるアプリケーションに取り組んでいます。

SQSキューは徐々に大きくなっており、含まれているはずの要素の数を確認する方法がありません。

これで、キューを定期的にポーリングし、固定サイズの要素セットがあるかどうかを確認するデーモンができました。たとえば、次の「キュー」について考えてみます。

q = ["msg1_comp1", "msg2_comp1", "msg1_comp2", "msg3_comp1", "msg2_comp2"]

ここで、ある時点で「msg1_comp1」、「msg2_comp1」、「msg3_comp1」が一緒にキューにあるかどうかを確認したいのですが、キューのサイズがわかりません。

APIを調べた後、キュー内の要素を1つだけ取得するか、固定数の要素を取得できるようですが、すべてではありません。

>>> rs = q.get_messages()
>>> len(rs)
1
>>> rs = q.get_messages(10)
>>> len(rs)
10

回答で提案されている提案は、たとえば、何も返されないまでループで10個のメッセージを取得することですが、SQSのメッセージには可視性タイムアウトがあります。つまり、キューから要素をポーリングしても、実際には削除されません。それらは短期間しか見えなくなります。

キューにあるすべてのメッセージを、いくつあるかを知らなくても取得する簡単な方法はありますか?

4

6 に答える 6

23

q.get_messages(n)whileループの内側に呼び出しを置きます。

all_messages=[]
rs=q.get_messages(10)
while len(rs)>0:
    all_messages.extend(rs)
    rs=q.get_messages(10)

さらに、dumpは10を超えるメッセージもサポートしません。

def dump(self, file_name, page_size=10, vtimeout=10, sep='\n'):
    """Utility function to dump the messages in a queue to a file
    NOTE: Page size must be < 10 else SQS errors"""
于 2012-04-16T20:06:24.347 に答える
22

AWS SQSキューを使用して即時通知を提供しているため、すべてのメッセージをリアルタイムで処理する必要があります。次のコードは、(すべての)メッセージを効率的にデキューし、削除するときにエラーを処理するのに役立ちます。

注:キューからメッセージを削除するには、メッセージを削除する必要があります。更新されたboto3AWSpython SDK、jsonライブラリ、および次のデフォルト値を使用しています。

import boto3
import json

region_name = 'us-east-1'
queue_name = 'example-queue-12345'
max_queue_messages = 10
message_bodies = []
aws_access_key_id = '<YOUR AWS ACCESS KEY ID>'
aws_secret_access_key = '<YOUR AWS SECRET ACCESS KEY>'
sqs = boto3.resource('sqs', region_name=region_name,
        aws_access_key_id=aws_access_key_id,
        aws_secret_access_key=aws_secret_access_key)
queue = sqs.get_queue_by_name(QueueName=queue_name)
while True:
    messages_to_delete = []
    for message in queue.receive_messages(
            MaxNumberOfMessages=max_queue_messages):
        # process message body
        body = json.loads(message.body)
        message_bodies.append(body)
        # add message to delete
        messages_to_delete.append({
            'Id': message.message_id,
            'ReceiptHandle': message.receipt_handle
        })

    # if you don't receive any notifications the
    # messages_to_delete list will be empty
    if len(messages_to_delete) == 0:
        break
    # delete messages to remove them from SQS queue
    # handle any errors
    else:
        delete_response = queue.delete_messages(
                Entries=messages_to_delete)
于 2015-10-15T18:36:27.380 に答える
8

私の理解では、SQSサービスの分散性により、設計がほとんど機能しなくなります。get_messagesを呼び出すたびに、異なるサーバーのセットと通信します。サーバーには、すべてではありませんが一部のメッセージが含まれます。したがって、「時々チェックイン」して特定のメッセージグループの準備ができているかどうかを設定し、それらを受け入れることはできません。

あなたがする必要があるのは、継続的にポーリングし、到着したすべてのメッセージを受け取り、それらを独自のデータ構造にローカルに保存することです。フェッチが成功するたびに、データ構造をチェックして、メッセージの完全なセットが収集されているかどうかを確認できます。

削除はすべてのSQSサーバーに伝播する必要があるため、メッセージは順不同で到着し、一部のメッセージは2回配信されることに注意してください。ただし、後続のgetリクエストが削除メッセージを打ち負かすことがあります

于 2012-04-21T23:11:19.517 に答える
3

これをcronジョブで実行します

from django.core.mail import EmailMessage
from django.conf import settings
import boto3
import json

sqs = boto3.resource('sqs', aws_access_key_id=settings.AWS_ACCESS_KEY_ID,
         aws_secret_access_key=settings.AWS_SECRET_ACCESS_KEY,
         region_name=settings.AWS_REGION)

queue = sqs.get_queue_by_name(QueueName='email')
messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)

while len(messages) > 0:
    for message in messages:
        mail_body = json.loads(message.body)
        print("E-mail sent to: %s" % mail_body['to'])
        email = EmailMessage(mail_body['subject'], mail_body['message'], to=[mail_body['to']])
        email.send()
        message.delete()

    messages = queue.receive_messages(MaxNumberOfMessages=10, WaitTimeSeconds=1)
于 2017-06-21T15:02:40.893 に答える
0

注:これは、質問に対する直接の回答を意図したものではありません。 むしろ、エンドユーザーがパッケージ(別名Boto2)を使用していると仮定すると、 @TimothyLiuの回答を補強するものです。このコードは、彼の回答で言及されている呼び出しの「Boto-2-ization」です。BotoBoto3delete_messages


where のBoto(2)呼び出しは、次の値に対応するkey :valueを持つオブジェクトです。delete_message_batch(messages_to_delete)messages_to_deletedictidreceipt_handle

AttributeError:'dict'オブジェクトには属性'id'がありません。

クラスオブジェクトdelete_message_batchを期待しているようです。一度に10を超える「メッセージ」を削除する場合、 BotoソースをMessageコピーして非オブジェクト(ala boto3 )を使用できるようにすることも失敗します。そのため、次の回避策を使用する必要がありました。delete_message_batchMessage

ここからコードを印刷

from __future__ import print_function
import sys
from itertools import islice

def eprint(*args, **kwargs):
    print(*args, file=sys.stderr, **kwargs)

@static_vars(counter=0)
def take(n, iterable, reset=False):
    "Return next n items of the iterable as same type"
    if reset: take.counter = 0
    take.counter += n
    bob = islice(iterable, take.counter-n, take.counter)
    if isinstance(iterable, dict): return dict(bob)
    elif isinstance(iterable, list): return list(bob)
    elif isinstance(iterable, tuple): return tuple(bob)
    elif isinstance(iterable, set): return set(bob)
    elif isinstance(iterable, file): return file(bob)
    else: return bob

def delete_message_batch2(cx, queue, messages): #returns a string reflecting level of success rather than throwing an exception or True/False
  """
  Deletes a list of messages from a queue in a single request.
  :param cx: A boto connection object.
  :param queue: The :class:`boto.sqs.queue.Queue` from which the messages will be deleted
  :param messages: List of any object or structure with id and receipt_handle attributes such as :class:`boto.sqs.message.Message` objects.
  """
  listof10s = []
  asSuc, asErr, acS, acE = "","",0,0
  res = []
  it = tuple(enumerate(messages))
  params = {}
  tenmsg = take(10,it,True)
  while len(tenmsg)>0:
    listof10s.append(tenmsg)
    tenmsg = take(10,it)
  while len(listof10s)>0:
    tenmsg = listof10s.pop()
    params.clear()
    for i, msg in tenmsg: #enumerate(tenmsg):
      prefix = 'DeleteMessageBatchRequestEntry'
      numb = (i%10)+1
      p_name = '%s.%i.Id' % (prefix, numb)
      params[p_name] = msg.get('id')
      p_name = '%s.%i.ReceiptHandle' % (prefix, numb)
      params[p_name] = msg.get('receipt_handle')
    try:
      go = cx.get_object('DeleteMessageBatch', params, BatchResults, queue.id, verb='POST')
      (sSuc,cS),(sErr,cE) = tup_result_messages(go)
      if cS:
        asSuc += ","+sSuc
        acS += cS
      if cE:
        asErr += ","+sErr
        acE += cE
    except cx.ResponseError:
      eprint("Error in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
    except:
      eprint("Error of unknown type in batch delete for queue {}({})\nParams ({}) list: {} ".format(queue.name, queue.id, len(params), params))
  return stringify_final_tup(asSuc, asErr, acS, acE, expect=len(messages)) #mdel #res

def stringify_final_tup(sSuc="", sErr="", cS=0, cE=0, expect=0):
  if sSuc == "": sSuc="None"
  if sErr == "": sErr="None"
  if cS == expect: sSuc="All"
  if cE == expect: sErr="All"
  return "Up to {} messages removed [{}]\t\tMessages remaining ({}) [{}]".format(cS,sSuc,cE,sErr)
于 2016-11-16T17:10:12.503 に答える
-1

以下のコードのようなものでうまくいくはずです。申し訳ありませんが、C#ですが、Pythonに変換するのは難しいことではありません。辞書は、重複を取り除くために使用されます。

    public Dictionary<string, Message> GetAllMessages(int pollSeconds)
    {
        var msgs = new Dictionary<string, Message>();
        var end = DateTime.Now.AddSeconds(pollSeconds);

        while (DateTime.Now <= end)
        {
            var request = new ReceiveMessageRequest(Url);
            request.MaxNumberOfMessages = 10;

            var response = GetClient().ReceiveMessage(request);

            foreach (var msg in response.Messages)
            {
                if (!msgs.ContainsKey(msg.MessageId))
                {
                    msgs.Add(msg.MessageId, msg);
                }
            }
        }

        return msgs;
    }
于 2015-02-07T00:31:21.427 に答える