9

Amazon SWFを使用してサーバー間でメッセージを通信しますか?

  1. サーバーAIでスクリプトAを実行したい
  2. それが終わったら、スクリプトBを実行するためにサーバーBにメッセージを送信したいと思います
  3. 正常に完了したら、ワークフローキューからジョブをクリアしたい

これを行うためにBotoとSWFを組み合わせて使用​​する方法を理解するのに非常に苦労しています。私は完全なコードを求めているわけではありませんが、私が求めているのは、関係することについて誰かがもう少し説明できるかどうかです。

  • スクリプトAの完了を確認するようにサーバーBに実際に指示するにはどうすればよいですか?
  • サーバーAがスクリプトAの完了を取得して、スクリプトBを実行しようとしないようにするにはどうすればよいですか(サーバーBがこれを実行する必要があるため)。
  • スクリプトAの完了を実際にSWFに通知するにはどうすればよいですか?あなたは旗ですか、それともメッセージですか、それとも何ですか?

私はこれらすべてについてかなり混乱しています。どのデザインを使うべきですか?

4

4 に答える 4

17

SWFがサービスとしてどれほど役立つかを強調するいくつかの非常に良い質問をすると思います。つまり、サーバー間で作業を調整するようにサーバーに指示することはありません。あなたの決定者は、SWFサービスの助けを借りて、これらすべてをあなたのために調整します。

ワークフローの実装は次のようになります。

  1. ワークフローとそのアクティビティをサービスに登録します(1回限り)。
  2. 決定者と労働者を実装します。
  3. あなたの労働者と決定者を走らせましょう。
  4. 新しいワークフローを開始します。

クレデンシャルをboto.swfのコードにフィードする方法はいくつかあります。この演習では、以下のコードを実行する前に、それらを環境にエクスポートすることをお勧めします。

export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>

1)ドメイン、ワークフロー、およびアクティビティを登録するには、以下を実行します。

# ab_setup.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

2)決定者と労働者を実装して実行します。

# ab_decider.py
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class ABDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        # Print history to familiarize yourself with its format.
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]
            decisions = swf.Layer1Decisions()
            # Record latest non-decision event.
            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']
            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
                   ACTIVITY1, VERSION, task_list='a_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Take decision based on the name of activity that has just completed.
                # 1) Get activity's event id.
                last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                # 2) Extract its name.
                activity_data = history['events'][completed_activity_id]
                activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                activity_name = activity_attrs['activityType']['name']
                # 3) Optionally, get the result from the activity.
                result = last_event['activityTaskCompletedEventAttributes'].get('result')

                # Take the decision.
                if activity_name == ACTIVITY1:
                    # Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
                    decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
                        ACTIVITY2, VERSION, task_list='b_tasks', input=result)
                elif activity_name == ACTIVITY2:
                    # Server B completed activity. We're done.
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

ワーカーははるかに単純です。必要がなければ、継承を使用する必要はありません。

# ab_worker.py
import os
import time
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'

class MyBaseWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION
    task_list = None

    def run(self):
        activity_task = self.poll()
        print activity_task
        if 'activityId' in activity_task:
            # Get input.
            # Get the method for the requested activity.
            try:
                self.activity(activity_task.get('input'))
            except Exception, error:
                self.fail(reason=str(error))
                raise error

            return True

    def activity(self, activity_input):
        raise NotImplementedError

class WorkerA(MyBaseWorker):
    task_list = 'a_tasks'

    def activity(self, activity_input):
        result = str(time.time())
        print 'worker a reporting time: %s' % result
        self.complete(result=result)

class WorkerB(MyBaseWorker):
    task_list = 'b_tasks'

    def activity(self, activity_input):
        result = str(os.getpid())
        print 'worker b returning pid: %s' % result
        self.complete(result=result)

3)決定者と労働者を実行します。決定者とワーカーは、別々のホストから実行されている場合と、1つの同じマシンから実行されている場合があります。4つのターミナルを開き、アクターを実行します。

最初にあなたの決定者

$ python -i ab_decider.py 
>>> while ABDecider().run(): pass
... 

次に、ワーカーA、サーバーAからこれを実行できます。

$ python -i ab_workers.py 
>>> while WorkerA().run(): pass

次に、ワーカーB、おそらくサーバーBからですが、すべてをラップトップから実行すると、同様に機能します。

$ python -i ab_workers.py 
>>> while WorkerB().run(): pass
... 

4)最後に、ワークフローを開始します。

$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>> 

戻って、俳優に何が起こるかを確認してください。1分間操作がないと、サービスから切断される場合があります。その場合は、上矢印+ Enterキーを押して、ポーリングループに再度入ります。

これで、AWS管理コンソールのSWFパネルに移動し、実行がどのように行われているかを確認し、その履歴を表示できます。または、コマンドラインからクエリを実行することもできます。

>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
{'startToCloseTimeout': '300', 'taskList': {'name': ...

これは、アクティビティの連続実行を伴うワークフローの単なる例ですが、決定者がアクティビティの並列実行をスケジュールおよび調整することも可能です。

これで少なくとも始められることを願っています。シリアルワークフローのもう少し複雑な例については、これを確認することをお勧めします

于 2013-02-17T20:40:30.263 に答える
5

共有するサンプルコードはありませんが、SWFを使用して、2つのサーバー間でスクリプトの実行を調整できます。これに関する主なアイデアは、SWFと通信する3つのコードを作成することです。

  • 最初に実行するスクリプトと、その最初のスクリプトの実行が完了したら何をするかを知っているコンポーネント。これは、SWFの用語では「決定者」と呼ばれます。
  • 各マシンで実行する特定のスクリプトを実行する方法をそれぞれ理解する2つのコンポーネント。これらは、SWF用語では「アクティビティワーカー」と呼ばれます。

最初のコンポーネントであるディサイダーは、PollForDecisionTaskとRespondDecisionTaskCompletedの2つのSWFAPIを呼び出します。ポーリングリクエストは、決定コンポーネントに実行中のワークフローの現在の履歴、基本的にはスクリプトランナーの「whereami」状態情報を提供します。これらのイベントを調べて、どのスクリプトを実行する必要があるかを判断するコードを記述します。スクリプトを実行するためのこれらの「コマンド」は、RespondDecisionTaskCompletedへの呼び出しの一部として返されるアクティビティタスクのスケジューリングの形式になります。

作成する2番目のコンポーネントであるアクティビティワーカーは、それぞれ2つのSWF API(PollForActivityTaskとRespondActivityTaskCompleted)を呼び出します。ポーリング要求は、アクティビティワーカーに、知っているスクリプトを実行する必要があること、つまりSWFがアクティビティタスクと呼ぶものを示します。ポーリング要求からSWFに返される情報には、アクティビティタスクのスケジューリングの一部としてSWFに送信された単一の実行固有のデータを含めることができます。各サーバーは、そのホストでのローカルスクリプトの実行を示すために、アクティビティタスクについてSWFを個別にポーリングします。ワーカーがスクリプトの実行を完了すると、RespondActivityTaskCompletedAPIを介してSWFにコールバックします。

アクティビティワーカーからSWFへのコールバックにより、すでに説明したディサイダーコンポーネントに新しい履歴が渡されます。履歴を確認し、最初のスクリプトが完了したことを確認し、2番目のスクリプトの実行をスケジュールします。2番目の決定が完了したことを確認すると、別のタイプの決定を使用してワークフローを「閉じる」ことができます。

StartWorkflowExecution APIを呼び出すことにより、各ホストでスクリプトを実行するプロセス全体を開始します。これにより、SWFでプロセス全体のレコードが作成され、最初の履歴が決定プロセスに送られ、最初のホストでの最初のスクリプトの実行がスケジュールされます。

うまくいけば、これにより、SWFを使用してこのタイプのワークフローを実現する方法についてもう少しコンテキストが得られます。まだ読んでいない場合は、SWFページの開発ガイドで追加情報を確認します。

于 2013-02-14T18:33:31.267 に答える
1

SNS を使用できます。スクリプト A が完了すると、SNS がトリガーされ、サーバー B への通知がトリガーされます。

于 2013-02-12T10:05:37.263 に答える