SWFがサービスとしてどれほど役立つかを強調するいくつかの非常に良い質問をすると思います。つまり、サーバー間で作業を調整するようにサーバーに指示することはありません。あなたの決定者は、SWFサービスの助けを借りて、これらすべてをあなたのために調整します。
ワークフローの実装は次のようになります。
- ワークフローとそのアクティビティをサービスに登録します(1回限り)。
- 決定者と労働者を実装します。
- あなたの労働者と決定者を走らせましょう。
- 新しいワークフローを開始します。
クレデンシャルをboto.swfのコードにフィードする方法はいくつかあります。この演習では、以下のコードを実行する前に、それらを環境にエクスポートすることをお勧めします。
export AWS_ACCESS_KEY_ID=<your access key>
export AWS_SECRET_ACCESS_KEY=<your secret key>
1)ドメイン、ワークフロー、およびアクティビティを登録するには、以下を実行します。
# ab_setup.py
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
2)決定者と労働者を実装して実行します。
# ab_decider.py
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class ABDecider(swf.Decider):
domain = DOMAIN
task_list = 'default_tasks'
version = VERSION
def run(self):
history = self.poll()
# Print history to familiarize yourself with its format.
print history
if 'events' in history:
# Get a list of non-decision events to see what event came in last.
workflow_events = [e for e in history['events']
if not e['eventType'].startswith('Decision')]
decisions = swf.Layer1Decisions()
# Record latest non-decision event.
last_event = workflow_events[-1]
last_event_type = last_event['eventType']
if last_event_type == 'WorkflowExecutionStarted':
# At the start, get the worker to fetch the first assignment.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
ACTIVITY1, VERSION, task_list='a_tasks')
elif last_event_type == 'ActivityTaskCompleted':
# Take decision based on the name of activity that has just completed.
# 1) Get activity's event id.
last_event_attrs = last_event['activityTaskCompletedEventAttributes']
completed_activity_id = last_event_attrs['scheduledEventId'] - 1
# 2) Extract its name.
activity_data = history['events'][completed_activity_id]
activity_attrs = activity_data['activityTaskScheduledEventAttributes']
activity_name = activity_attrs['activityType']['name']
# 3) Optionally, get the result from the activity.
result = last_event['activityTaskCompletedEventAttributes'].get('result')
# Take the decision.
if activity_name == ACTIVITY1:
# Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
ACTIVITY2, VERSION, task_list='b_tasks', input=result)
elif activity_name == ACTIVITY2:
# Server B completed activity. We're done.
decisions.complete_workflow_execution()
self.complete(decisions=decisions)
return True
ワーカーははるかに単純です。必要がなければ、継承を使用する必要はありません。
# ab_worker.py
import os
import time
import boto.swf.layer2 as swf
DOMAIN = 'stackoverflow'
ACTIVITY1 = 'ServerAActivity'
ACTIVITY2 = 'ServerBActivity'
VERSION = '1.0'
class MyBaseWorker(swf.ActivityWorker):
domain = DOMAIN
version = VERSION
task_list = None
def run(self):
activity_task = self.poll()
print activity_task
if 'activityId' in activity_task:
# Get input.
# Get the method for the requested activity.
try:
self.activity(activity_task.get('input'))
except Exception, error:
self.fail(reason=str(error))
raise error
return True
def activity(self, activity_input):
raise NotImplementedError
class WorkerA(MyBaseWorker):
task_list = 'a_tasks'
def activity(self, activity_input):
result = str(time.time())
print 'worker a reporting time: %s' % result
self.complete(result=result)
class WorkerB(MyBaseWorker):
task_list = 'b_tasks'
def activity(self, activity_input):
result = str(os.getpid())
print 'worker b returning pid: %s' % result
self.complete(result=result)
3)決定者と労働者を実行します。決定者とワーカーは、別々のホストから実行されている場合と、1つの同じマシンから実行されている場合があります。4つのターミナルを開き、アクターを実行します。
最初にあなたの決定者
$ python -i ab_decider.py
>>> while ABDecider().run(): pass
...
次に、ワーカーA、サーバーAからこれを実行できます。
$ python -i ab_workers.py
>>> while WorkerA().run(): pass
次に、ワーカーB、おそらくサーバーBからですが、すべてをラップトップから実行すると、同様に機能します。
$ python -i ab_workers.py
>>> while WorkerB().run(): pass
...
4)最後に、ワークフローを開始します。
$ python
Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41)
[GCC 4.4.3] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import boto.swf.layer2 as swf
>>> workflows = swf.Domain(name='stackoverflow').workflows()
>>> workflows
[<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
>>> execution = workflows[0].start(task_list='default_tasks')
>>>
戻って、俳優に何が起こるかを確認してください。1分間操作がないと、サービスから切断される場合があります。その場合は、上矢印+ Enterキーを押して、ポーリングループに再度入ります。
これで、AWS管理コンソールのSWFパネルに移動し、実行がどのように行われているかを確認し、その履歴を表示できます。または、コマンドラインからクエリを実行することもできます。
>>> execution.history()
[{'eventId': 1, 'eventType': 'WorkflowExecutionStarted',
'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'},
'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy':
'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version':
'1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2,
'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes':
{'startToCloseTimeout': '300', 'taskList': {'name': ...
これは、アクティビティの連続実行を伴うワークフローの単なる例ですが、決定者がアクティビティの並列実行をスケジュールおよび調整することも可能です。
これで少なくとも始められることを願っています。シリアルワークフローのもう少し複雑な例については、これを確認することをお勧めします。