1

を使用してワークフローを設計しようとしましたcomposite pattern。サンプル コードは次のようになります。

class CommandInterface(object):
    def __init__(self, name, template=None, tool=None, param : dict={},input=None, output=None ):
        self.name = name
        self.input = input
        self.output = output
        self.param = param
        self.template = template
    def before_invoke(self):
        pass   # check before invoke
    def invoke(self):
        pass
    def after_invoke(self):
        pass  # check after invoke
class ShellCommand(CommandInterface):    
    def render(self):
        cmd = self.template.format(input=self.input,
                                    output=self.output,
                                    param=self.param,
                                    )
        return cmd
    def before_invoke(self):
        pass
    def invoke(self):
        os.system(self.render())

class Workflow(CommandInterface):
    def __init__(self, name):
       self.name = name
       self._input = []
       self._output = []
       self._commands = []
       self._tool = []
    def add(self, command : CommandInterface):
        self._commands.append(command)
        return self

    def invoke(self):
        for command in self._commands:
            command.invoke()

そして、次のように使用できます。

workflow = Workflow()
raw_files = ["file1", "file2", "file3"]
for i in raw_files:
    head_command = ShellCommand("head {input} {output}", 
                                input = i, 
                                output = i+"_head")
    workflow.add(head_command)
merge_command = ShellCommand("cat {input} > {output}", 
                             input=" ".join([i+"_head" for i in rawfiles]), 
                             output="merged")
workflow.add(merge_command)

workflow.invoke()

ただし、コマンドの入力が別のコマンドの結果になる場合があります。たとえば、次のようになります。

class PythonCommand(CommandInterface):    
    def invoke(self):
        self._result = self.template(input=self.input, output=self.output, param=self.param)

def a_command(input, output, param):
    take_long_time(input, output, param)
    return {"a_result": "some result will be used in b_command"}

def b_command(input, output, param):
    def need_the_result_of_a_command(input, output, param):
        return param["a_result"]
    need_the_result_of_a_command(input, output, param)
    return "success"

a = PythonCommand(a_command, input_, output_, param_)
a.invoke()
b = PythonCommand(b_command, input_, output_, param= a._result)
b.invoke()

bの結果をパラメーターとして使用するため、ワークフローを使用してこれらの 2 つのコマンドを合成することはできませんa。これは、が呼び出された後にのみ表示されます。ただし、状況によっては、以前のコードのようなワークフロー マネージャーが依然として必要です。私はこのようなものが必要だと思います:

workflow = Workflow()
a = PythonCommand(a_command, input_, output_, param_)
workflow.add(a)
b = PythonCommand(b_command, input_, output_, param= {"a_result": a.lazy()._result})
workflow.add(b)
workflow.invoke()

.lazy()このような実装でワークフローを設計する方法についてアイデアを持っている人はいますか?

4

0 に答える 0