1

EDIT:これは基本的に「Javaでデータフローエンジンを適切に実装する方法」の質問であり、これは単一の回答では適切に答えられないと感じています(「ORMレイヤーを適切に実装する方法」を尋ねて、誰かにHibernate などの詳細を書き出してください)、この質問は「閉じた」と考えてください。

Javaで動的データフローをモデル化するエレガントな方法はありますか? データフローとは、さまざまな種類のタスクが存在することを意味し、これらのタスクは任意に「接続」できます。たとえば、タスクが終了すると、終了したタスクの出力を入力として後続のタスクが並列に実行されたり、複数のタスクが終了したときにそれらのタスクが実行されたりします。出力は後続タスクに集約されます (フローベースのプログラミングを参照)。動的とは、タスクが終了したときの後続タスクのタイプと数が、その終了したタスクの出力に依存することを意味します。たとえば、タスク A は、特定の出力がある場合はタスク B を生成する可能性がありますが、ある出力がある場合はタスク C を生成する可能性があります。別の出力。別の言い方をすれば、各タスク (または一連のタスク) が次のタスクを決定する責任があるということです。

Web ページをレンダリングするためのサンプル データフロー: タスクの種類として、ファイル ダウンローダー、HTML/CSS レンダラー、HTML パーサー/DOM ビルダー、画像レンダラー、JavaScript パーサー、JavaScript インタープリターがあります。

  • HTML ファイルのファイル ダウンローダ タスク
    • HTML パーサー/DOM ビルダー タスク
      • 各埋め込みファイル/リンクのファイル ダウンローダ タスク
        • 画像の場合、画像レンダラー
        • 外部 JavaScript の場合、JavaScript パーサー
          • JavaScript インタープリター
        • それ以外の場合は、HTML パーサー タスクの一部の var/field に格納するだけです
      • 各埋め込みスクリプトの JavaScript パーサー
        • JavaScript インタープリター
      • 上記のタスクが完了するのを待ってから、HTML/CSS レンダラー (明らかに最適または完全に正しいわけではありませんが、これは簡単です)

私は、ソリューションが何らかの包括的なフレームワークである必要があると言っているわけではありません (実際、JDK API に近いほど良いです)。重量級のものは、Spring Web Flow や宣言型マークアップ、その他の DSL などとはまったく異なります。 .

より具体的に言うと、Callables、Executors、ExecutorCompletionServices、およびおそらくさまざまなシンクロナイザー クラス (Semaphore や CountDownLatch など) を使用して Java でこれをモデル化する良い方法を考えようとしています。いくつかの使用例と要件があります。

  1. タスクが実行されるエグゼキューターについて、仮定をしないでください。実際、簡単にするために、エグゼキュータが 1 つしかないと仮定します。これは固定スレッド プールのエグゼキュータである可能性があるため、単純な実装ではデッドロックが発生する可能性があります (たとえば、別のタスクをサブミットし、そのサブタスクが完了するまでブロックするタスクを想像してください。これらのタスクのいくつかがすべてのスレッドを使い果たしていると想像してください)。
  2. 簡単にするために、データがタスク間でストリーミングされないと仮定します (タスク出力 -> 後続タスク入力)。終了タスクと後続タスクは一緒に存在する必要がないため、後続タスクへの入力データは、前のタスク (既に完了しているため)。
  3. データフロー「エンジン」が処理できる操作は、次の 2 つだけです。
    1. タスクがより多くのタスクをキューに入れることができるメカニズム
    2. 必要なすべての入力タスクが完了するまで、後続タスクがキューに入れられないメカニズム
    3. フローが終了するまでメイン スレッド (またはエグゼキュータによって管理されていない他のスレッド) がブロックされるメカニズム
    4. 特定のタスクが完了するまでメイン スレッド (またはエグゼキュータによって管理されていない他のスレッド) がブロックされるメカニズム
  4. データフローは動的 (タスクの入力/状態に依存) であるため、これらのメカニズムのアクティブ化はタスク コード内で発生する必要があります。
  5. データフローの「内部」は、タスク (Callable) 自体に公開しないでください。上記の操作のみをタスクで使用できるようにする必要があります。
  6. データのタイプは、すべてのタスクで必ずしも同じではないことに注意してください。たとえば、ファイル ダウンロード タスクは、入力としてファイルを受け入れますが、文字列を出力します。
  7. タスクがキャッチされていない例外 (すべてのデータフロー処理を停止する必要がある致命的なエラーを示す) をスローした場合、できるだけ早くデータフローを開始したスレッドまで伝播し、すべてのタスク (または致命的なエラー ハンドラーのようなより手の込んだもの) をキャンセルする必要があります。
  8. タスクはできるだけ早く開始する必要があります。これは、前の要件とともに、単純な Future ポーリング + Thread.sleep() を排除する必要があります。
  9. おまけとして、タスクが終了するたびに、または最後のタスクが終了してから X 時間が経過したときに、データフロー エンジン自体に何らかのアクション (ログ記録など) を実行させたいと考えています。何かのようなもの:ExecutorCompletionService<T> ecs; while (hasTasks()) { Future<T> future = ecs.poll(1 minute); some_action_like_logging(); if (future != null) { future.get() ... } ... }

Java 同時実行 API を使用してこれらすべてを行う簡単な方法はありますか? または、JDK で使用できるものを使用しても複雑になる場合、要件を満たす軽量のライブラリはありますか? 私はすでに、私の特定のユース ケースに適合する部分的なソリューションを持っています (私は 2 つのエグゼキューターを使用しているため、ある意味でごまかしています。ご存じのとおり、上記の Web ブラウザーの例とはまったく関係ありません)。より汎用的で洗練されたソリューションを見たいと思っています。

4

2 に答える 2

1

次のようなインターフェイスを定義するのはどうですか:

interface Task extends Callable {
  boolean isReady();
}

あなたの「データフローエンジン」は、タスクオブジェクトのコレクションを管理するだけで済みます。つまり、新しいタスクオブジェクトを実行のためにキューに入れ、特定のタスクのステータスに関するクエリを許可します(したがって、上記のインターフェースを拡張してidおよび/を含める必要があるかもしれませんまたは入力します)。タスクが完了すると (そしてもちろんエンジンが開始されると)、エンジンは開始されていないタスクを照会して、準備ができているかどうかを確認し、準備ができている場合はそれらを渡してエグゼキューターで実行する必要があります。あなたが言及したように、ロギングなども行うことができます。

役立つ可能性があるもう 1 つのことは、Guice ( http://code.google.com/p/google-guice/ ) または同様の軽量 DI フレームワークを使用して、すべてのオブジェクトを正しく接続することです (たとえば、正しいエグゼキュータが確実に実行されるようにするため)。タイプが作成され、複雑な循環関係を導入することなく、データフロー エンジンへのアクセスが必要なタスク (たとえば、isReady メソッドや他のタスクのキューイングなど) にインスタンスを提供できるようにします。

HTH ですが、重要な側面を見逃している場合はコメントしてください... ポール。

于 2010-05-19T11:32:14.337 に答える