/triggering Luigi Task を Python コードから呼び出しているときに問題が発生しました。
- 基本的に、コマンド ラインで行うのと同じように luigi タスクをトリガーする必要がありますが、Python コードから
- シェルコマンドを使用して luigi タスクを呼び出すために supbrocess.popen を使用しています
- test.py という名前のテスト コードがあり、モジュール task_scheduler.py にテスト クラスがあり、luigi タスクが含まれています (両方のモジュールが同じ場所/ディレクトリにあります)。
import luigi
class TestClass(luigi.Task):
# param = luigi.DictParameter(default=dict())
def requires(self):
print "I am TestClass req"
def run(self):
with open('myfile.txt', 'w') as f:
f.write("asasasas")
print "I am TestClass run"
import subprocess
p = subprocess.Popen("python -m luigi --module task_scheduler TestClass", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
print p.pid
(output, err) = p.communicate()
print "-------------O/P-------------"
print output
print "-------------error-------------"
print err
しかし、私はエラーが発生しています
52688
-------------O/P-------------
-------------error-------------
ERROR: Uncaught exception in luigi
Traceback (most recent call last):
File "/Library/Python/2.7/site-packages/luigi/retcodes.py", line 61, in run_with_retcodes
worker = luigi.interface._run(argv)['worker']
File "/Library/Python/2.7/site-packages/luigi/interface.py", line 238, in _run
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
File "/Library/Python/2.7/site-packages/luigi/interface.py", line 172, in _schedule_and_run
not(lock.acquire_for(env_params.lock_pid_dir, env_params.lock_size, kill_signal))):
File "/Library/Python/2.7/site-packages/luigi/lock.py", line 82, in acquire_for
my_pid, my_cmd, pid_file = get_info(pid_dir)
File "/Library/Python/2.7/site-packages/luigi/lock.py", line 67, in get_info
pid_file = os.path.join(pid_dir, hashlib.md5(cmd_hash).hexdigest()) + '.pid'
TypeError: must be string or buffer, not None
誰かが私がここで間違っていることを教えてもらえますか? シェルプロンプトを使用すると、コマンド「python -m luigi --module task_scheduler TestClass」は完全に機能します