問題タブ [luigi]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
luigi - ルイージは、require メソッドにリストされているすべてのタスクを完了できません
次の依存構造を持つタスクがあるとします
子タスクはそれ自体で正常に実行されます。親は、すべての子タスクの完了ステータスを正しくチェックします。ただし、最初の子タスクが終了すると、スケジューラは親タスクを終了済みとしてマークします。次のメッセージが表示されます。
python - Pythonセロリでルイージを使用できますか
Web アプリケーションにセロリを使用しています。セロリは親タスクを実行し、さらにタスクのパイプラインを実行します
セロリの問題点
親タスクのステータスを確認するために luigi で取得した依存関係グラフとビジュアライザーを取得できません
Celery は、失敗したパイプラインを再開し、失敗した場所から開始するメカニズムを提供しません。
ルイージから簡単に入手できるこれらの 2 つのこと。
だから私は、セロリが親タスクを実行したら、そのタスク内でルイージパイプラインを実行すると考えていました。
つまり、 queuesize に基づいてセロリワーカーを自動スケーリングする必要があります。それは複数のマシンにまたがるルイージワーカーに影響しますか??
python - ルイージはファイルを S3 に直接書き込みます
Luigi でデータ パイプラインを作成しており、処理されたデータを S3 バケットに直接書き込もうとしています。私が使用したコードは次のとおりです。
スクリプトを実行した後、エラーが発生しました:
ファイルを S3 バケットに直接書き込むことはできますか?
python-2.7 - HiveThriftContext の get_partitions_by_filter コマンドの構文は何ですか?
ゴール
ハイブテーブルに部分的に指定されたパーティションが存在するかどうかを確認しようとしています。
詳細
sourceとdateの 2 つのパーティション キーを持つテーブルがあります。date
タスクを実行する前に、特定の(が指定されていない)のパーティションが存在するかどうかを確認する必要がsource
あります。
試み
これは、luigi の組み込みの Hive パーティション ターゲットとデフォルトのクライアントを使用して簡単に実行できます。
しかし、デフォルトのクライアントは、Hive のコマンドライン インスタンスをスピンアップしてクエリを実行しているため、非常に低速です。そこで、デフォルトのクライアントを中古のクライアントに交換しようとしたところ、次のことが起こりました。
2 つのクライアントは、部分的に指定されたパーティションを異なる方法で解釈しているようです。
MetastoreClient を継承し、過去に必要だったいくつかの機能を追加する独自のクライアントを既に作成しているので、独自の設計の部分的に指定されたパーティション チェックを追加してもかまいません。そして、クライアントには必要な機能があるようです:
get_partitions_by_filter
コマンドはまさに私が望んでいることを実行するように見えますが、それが期待する型の自動生成されたリストを除いて、どこにもドキュメントが見つかりません。そして、より単純な関数で同様の問題に遭遇しました。存在することがわかっているパーティションを完全に指定すると、それらを取得get_partition
またはget_partition_by_name
検索できません。これは、正しい形式で引数を提供していないためだと確信していますが、正しい形式が何であるかがわからず、推測に関して私の忍耐力が尽きました。
HiveThriftContext の get_partitions_by_filter コマンドの構文は何ですか?
フォローアップの質問: どうやってこれを理解しましたか?
python - ルイージで自動インスタンス化?
ではluigi.Task.run
、オブジェクトをファイル/データベース/などにシリアライズする必要があります。
ただし、便宜上、pd.read_csv(...)
タスクを再利用するときに同じインスタンス化手順を記述する必要があるため、スニペットをスキップしたいと思います。
このようにルイージで自動的にインスタンス化する方法はありますか?:
subprocess - エラー: luigi で例外がキャッチされませんでした (TypeError: None ではなく、文字列またはバッファーである必要があります)
/triggering Luigi Task を Python コードから呼び出しているときに問題が発生しました。
- 基本的に、コマンド ラインで行うのと同じように luigi タスクをトリガーする必要がありますが、Python コードから
- シェルコマンドを使用して luigi タスクを呼び出すために supbrocess.popen を使用しています
- test.py という名前のテスト コードがあり、モジュール task_scheduler.py にテスト クラスがあり、luigi タスクが含まれています (両方のモジュールが同じ場所/ディレクトリにあります)。
しかし、私はエラーが発生しています
誰かが私がここで間違っていることを教えてもらえますか? シェルプロンプトを使用すると、コマンド「python -m luigi --module task_scheduler TestClass」は完全に機能します