7

Amazon は将来の開発のために boto3 を推進していますが、新しい boto3 に関する十分なドキュメントを提供していません。

共有したい boto3 で SWF を使用するサンプル コードはありますか?

4

2 に答える 2

16

これは私がこれまでに見つけた唯一の例です:

https://github.com/jhludwig/aws-swf-boto3

プロセスの概要は次のようになります (これは上記のリンクから直接引用されていますが、いくつかの追加のメモとフローが追加されていることに注意してください)。

SWF は物の名前で動作することに注意してください。これらの名前に実行の意味を与えるのはコード次第です。たとえば、Decider意志をポーリングし、タスク名を使用して次の処理を決定します。

私が正確に確信していないいくつかのこと。TASKLIST私が信じている参照は、一種の名前空間です。それは実際には物事のリストではなく、名前で物事を分離することに関するものです。今、私はそれについて完全に間違っている可能性があります.私の基本的な理解から、それは私が言っていることだと思います.

どこからでもディサイダーとワーカーを実行できます。彼らは AWS に到達するため、ファイアウォールで 0.0.0.0/0 の送信が許可されていれば、アクセスできます。

AWS Docs には、ラムダを実行できることも記載されていますが、それをトリガーする方法がわかりません。

boto3 swf クライアントを作成します。

import boto3
from botocore.exceptions import ClientError

swf = boto3.client('swf')

ドメインを作成する

try:
  swf.register_domain(
    name=<DOMAIN>,
    description="Test SWF domain",
    workflowExecutionRetentionPeriodInDays="10" # keep history for this long
  )
except ClientError as e:
    print "Domain already exists: ", e.response.get("Error", {}).get("Code")

ドメインを作成したら、ワークフローを登録します。

ワークフローの登録

try:
  swf.register_workflow_type(
    domain=DOMAIN, # string
    name=WORKFLOW, # string
    version=VERSION, # string
    description="Test workflow",
    defaultExecutionStartToCloseTimeout="250",
    defaultTaskStartToCloseTimeout="NONE",
    defaultChildPolicy="TERMINATE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Test workflow created!"
except ClientError as e:
  print "Workflow already exists: ", e.response.get("Error", {}).get("Code")

ワークフローが登録されたので、タスクの割り当てを開始できます。

タスクをワークフローに割り当てます。

N 個のタスクを割り当てることができます。これらは主に文字列であり、コードによって実行の意味が与えられることに注意してください。

try:
  swf.register_activity_type(
    domain=DOMAIN,
    name="DoSomething",
    version=VERSION, # string
    description="This is a worker that does something",
    defaultTaskStartToCloseTimeout="NONE",
    defaultTaskList={"name": TASKLIST } # TASKLIST is a string
  )
  print "Worker created!"
except ClientError as e:
  print "Activity already exists: ", e.response.get("Error", {}).get("Code")

送信 ワークフローの開始

ドメイン、ワークフロー、およびタスクが作成されたので、ワークフローを開始できます。

import boto3

swf = boto3.client('swf')

response = swf.start_workflow_execution(
  domain=DOMAIN # string,
  workflowId='test-1001',
  workflowType={
    "name": WORKFLOW,# string
    "version": VERSION # string
  },
  taskList={
      'name': TASKLIST
  },
  input=''
)

print "Workflow requested: ", response

に注意してくださいworkflowId。これはカスタム識別子です (例: ) str(uuid.uuid4())。ドキュメントから:

ワークフローの実行に関連付けられたユーザー定義の識別子。これを使用して、カスタム識別子をワークフロー実行に関連付けることができます。ワークフローの実行が論理的に前回の実行の再開である場合は、同じ識別子を指定できます。同じワークフロー ID を持つ 2 つのワークフロー実行を同時に開くことはできません。

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.start_workflow_execution

この時点では、Deciderrunning も も何もないため、何も起こりませんWorkers。それらがどのように見えるか見てみましょう。

ディサイダー

ディサイダーは、決定タスクを取得するためにポーリングして、以下について決定します。

import boto3
from botocore.client import Config
import uuid

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

上記のタイムアウト設定に注意してください。この PR を参照して、その背後にある理論的根拠を確認できます。

https://github.com/boto/botocore/pull/634

Boto3 SWF ドキュメントから:

ワーカーは、クライアント側のソケット タイムアウトを少なくとも 70 秒に設定する必要があります (サービスがポーリング リクエストを保持できる最大時間よりも 10 秒長くなります)。

その PR により、boto3 はその機能を実行できるようになりました。

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_decision_task

print "Listening for Decision Tasks"

while True:

  newTask = swf.poll_for_decision_task(
    domain=DOMAIN ,
    taskList={'name': TASKLIST }, # TASKLIST is a string
    identity='decider-1', # any identity you would like to provide, it's recorded in the history
    reverseOrder=False)

  if 'taskToken' not in newTask:
    print "Poll timed out, no new task.  Repoll"

  elif 'events' in newTask:

    eventHistory = [evt for evt in newTask['events'] if not evt['eventType'].startswith('Decision')]
    lastEvent = eventHistory[-1]

    if lastEvent['eventType'] == 'WorkflowExecutionStarted':
      print "Dispatching task to worker", newTask['workflowExecution'], newTask['workflowType']
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'ScheduleActivityTask',
            'scheduleActivityTaskDecisionAttributes': {
                'activityType':{
                    'name': TASKNAME, # string
                    'version': VERSION # string
                    },
                'activityId': 'activityid-' + str(uuid.uuid4()),
                'input': '',
                'scheduleToCloseTimeout': 'NONE',
                'scheduleToStartTimeout': 'NONE',
                'startToCloseTimeout': 'NONE',
                'heartbeatTimeout': 'NONE',
                'taskList': {'name': TASKLIST}, # TASKLIST is a string
            }
          }
        ]
      )
      print "Task Dispatched:", newTask['taskToken']

    elif lastEvent['eventType'] == 'ActivityTaskCompleted':
      swf.respond_decision_task_completed(
        taskToken=newTask['taskToken'],
        decisions=[
          {
            'decisionType': 'CompleteWorkflowExecution',
            'completeWorkflowExecutionDecisionAttributes': {
              'result': 'success'
            }
          }
        ]
      )
      print "Task Completed!"

このスニペットの最後で、完了したかどうかを確認し、完了したことを SWF に通知ActivityTaskCompletedするという決定で応答することに注意してください。CompleteWorkflowExecution

それは決定者です、労働者はどのように見えますか?

ワーカー

http://boto3.readthedocs.org/en/latest/reference/services/swf.html#SWF.Client.poll_for_activity_task

繰り返しますが、read_timeout

import boto3
from botocore.client import Config

botoConfig = Config(connect_timeout=50, read_timeout=70)
swf = boto3.client('swf', config=botoConfig)

次に、ワーカーのポーリングを開始します。

print "Listening for Worker Tasks"

while True:

  task = swf.poll_for_activity_task(
    domain=DOMAIN,# string
    taskList={'name': TASKLIST}, # TASKLIST is a string
    identity='worker-1') # identity is for our history

  if 'taskToken' not in task:
    print "Poll timed out, no new task.  Repoll"

  else:
    print "New task arrived"

    swf.respond_activity_task_completed(
        taskToken=task['taskToken'],
        result='success'
    )

    print "Task Done"

ここでも、作業が完了したことを SWF に通知します。

于 2016-03-22T18:25:20.500 に答える
1

公式ドキュメントへのリンクは [こちら][1] です。

リンクまたは[これ][2]をたどるだけで、多くのコードサンプルが公開されています。利用可能なサービス セクションの下に、boto3 が現在サポートしているすべてのサービスと詳細な例が記載されています。

例のいくつかは次のとおりです: boto3 と SWF の実行カウントの取得

import boto3
import datetime
import time
import dateutil.tz

def lambda_handler(event,context):
    swfClient = boto3.client('swf')
    currentTimeZone = dateutil.tz.gettz('Australia/Brisbane')
    latestDate = datetime.datetime.now(tz=currentTimeZone)
    oldestDate = latestDate - datetime.timedelta(1)

    fullTextPreloadResponse = swfClient.count_open_workflow_executions(
         domain=domainName,
         startTimeFilter={
             'oldestDate': oldestDate,
             'latestDate': latestDate
         },
         typeFilter={
             'name': 'NAME_OF_YOUR_SWF_WORKFLOW_NAME',
             'version': 'VERSION_NUMBER'
         }
     )
     print("the count is " + str(fullTextResponse['count']))
     print(fullTextResponse)

これは、私の場合、実行中の SWF ワークフロー タイプの数を取得するために使用したものです。私が使用した形式は、上記のドキュメントで明確に定義されています。

boto3 と SWF を簡単に併用するには、Python のラムダ関数に boto3 をインポートすることから始めます。次に、python DateTime が追加されています。次に、boto3.client は | を使用できる場所からクライアントを設定します。SWF と対話します。

他の例は次のとおりです。

history = swf.get_workflow_execution_history(
            domain= domainName,
            execution={
                'workflowId': workflowId,
                'runId': runId
            },
        )

これがあなたを助けることを願っています![1]: https://boto3.amazonaws.com/v1/documentation/api/latest/index.html [2]: https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services /index.html

于 2020-07-06T11:25:58.340 に答える