3

私は amazon swf のフロー フレームワークを使用しており、優先度の高いワークフローの実行と通常のワークフローの実行を実行できるようにしたいと考えています。優先度の高いタスクがある場合、アクティビティは通常の優先度のタスクより優先度の高いタスクを優先する必要があります。これを達成するための最良の方法は何ですか?

以下がうまくいくかもしれないと思っていますが、より良い/推奨されるアプローチがあるのではないかと思います。

  1. アクティビティ用に 2 つのアクティビティ ワーカーと 2 つのアクティビティ リストを定義します。1 つの優先リストと 1 つの通常リスト。各ワーカーは同じアクティビティ クラスを使用します。
  2. 両方のワーカーが同じホスト (ec2 インスタンス) で実行されます。
  3. ワークフローでは、startNormalWorkflow と startHighWorkflow の 2 つのメソッドを定義します。startHighWorkflow メソッドでは、ActivitySchedulingOptions を使用して、タスクを優先度の高いリストに入れることができます。

このアプローチの問題は、優先度の高いタスクが通常のタスクの前にスケジュールされるという保証がないことです。

4

2 に答える 2

2

良い質問ですね。しばらく頭を悩ませました。

もちろん、この猫の皮を剥ぐ方法は複数あり、有効な解決策は数​​多くあります。ここでは、考えられる最も単純な方法、つまり、1 つのワークフロー内で優先順位に従ってタスクを実行する方法に焦点を当てました。

シナリオは次のようになります: 2 つのタスク リストを提供する 1 つのアクティビティ ワーカーを定義default_tasksurgent_tasks、単純なロジックを使用します。

  1. urgent_tasksリストに保留中のタスクがある場合は、そこから 1 つを選択します。
  2. それ以外の場合は、からタスクを選択しますdefault_tasks
  3. 選択したタスクを実行します。

問題は、保留中の優先度の高いタスクがあるかどうかを確認する方法です。CountPendingActivityTasks API が役に立ちます!

開発にFlowを使用していることは知っています。boto.swf.layer2Python はプロトタイピングが非常に簡単であるため、私の例は以下を使用して記述されていますが、考え方は同じままであり、優先度の高いワークフローと低いワークフローの実行を伴うより複雑なシナリオに拡張できます。

したがって、boto.swfを使用して上記を達成するには、次の手順に従います。

認証情報を環境にエクスポートする

$ export AWS_ACCESS_KEY_ID=your access key
$ export AWS_SECRET_ACCESS_KEY= your secret key 

コード スニペットを取得する

便宜上、github からフォークできます。

$ git clone git@github.com:oozie/stackoverflow.git
$ cd stackoverflow/amazon-swf/priority_tasks/

ドメインとワークフローをブートストラップするには:

# domain_setup.py 
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

swf.Domain(name=DOMAIN).register()
swf.ActivityType(domain=DOMAIN, name='SomeActivity', version=VERSION, task_list='default_tasks').register()
swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()

ディサイダーの実装:

# decider.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
ACTIVITY = 'SomeActivity'
VERSION = '1.0'

class MyWorkflowDecider(swf.Decider):

    domain = DOMAIN
    task_list = 'default_tasks'
    version = VERSION

    def run(self):
        history = self.poll()
        print history
        if 'events' in history:
            # Get a list of non-decision events to see what event came in last.
            workflow_events = [e for e in history['events']
                               if not e['eventType'].startswith('Decision')]

            decisions = swf.Layer1Decisions()

            last_event = workflow_events[-1]
            last_event_type = last_event['eventType']

            if last_event_type == 'WorkflowExecutionStarted':
                # At the start, get the worker to fetch the first assignment.
                decisions.schedule_activity_task(ACTIVITY+'1', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'2', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'3', ACTIVITY, VERSION, task_list='default_tasks')
                decisions.schedule_activity_task(ACTIVITY+'4', ACTIVITY, VERSION, task_list='urgent_tasks')
                decisions.schedule_activity_task(ACTIVITY+'5', ACTIVITY, VERSION, task_list='default_tasks')
            elif last_event_type == 'ActivityTaskCompleted':
                # Complete workflow execution after 5 completed activities.
                closed_activity_count = sum(1 for wf_event in workflow_events if wf_event.get('eventType') == 'ActivityTaskCompleted')
                if closed_activity_count == 5:
                    decisions.complete_workflow_execution()

            self.complete(decisions=decisions)
            return True

ワーカー実装の優先順位:

# worker.py
import boto.swf.layer2 as swf

DOMAIN = 'stackoverflow'
VERSION = '1.0'

class PrioritizingWorker(swf.ActivityWorker):

    domain = DOMAIN
    version = VERSION

    def run(self):

        urgent_task_count = swf.Domain(name=DOMAIN).count_pending_activity_tasks('urgent_tasks').get('count', 0)
        if urgent_task_count > 0:
            self.task_list = 'urgent_tasks'
        else:
            self.task_list = 'default_tasks'
        activity_task = self.poll()

        if 'activityId' in activity_task:
            print urgent_task_count, 'urgent tasks in the queue. Executing ' + activity_task.get('activityId')
            self.complete()
            return True

インタラクティブな Python シェルの 3 つのインスタンスからワークフローを実行する

ディサイダーを実行します。

$ python -i decider.py
>>> while MyWorkflowDecider().run(): pass
... 

実行を開始します。

$ python -i decider.py 
>>> swf.WorkflowType(domain='stackoverflow', name='MyWorkflow', version='1.0', task_list='default_tasks').start()

最後に、ワーカーを開始して、実行中のタスクを監視します。

$ python -i worker.py 
>>> while PrioritizingWorker().run(): pass
... 
2 urgent tasks in the queue. Executing SomeActivity2
1 urgent tasks in the queue. Executing SomeActivity4
0 urgent tasks in the queue. Executing SomeActivity5
0 urgent tasks in the queue. Executing SomeActivity1
0 urgent tasks in the queue. Executing SomeActivity3
于 2013-09-13T22:27:31.093 に答える