6

現在、次の問題に 1 週​​間苦労しており、アドバイスが必要です。

def query(title: String): List[Search]   // query("Terminator") => ["Terminator I", "Terminator II", "Terminator 1984", etc...]

def searchIMDB(s: Search): List[SearchResult]
def searchTMDB(s: Search): List[SearchResult]

def filterRedundantSearchResults(sr: SearchResult): Option[SearchResult]

def fetchIMDB(sr: SearchResult): List[MetaInfo]
def fetchTMDB(sr: SearchResult): List[MetaInfo]

def consolidate(infos: List[MetaInfo]): List[List[MetaInfo]]

次のようなパイプラインを構築したい:

query("Terminator")
-> [askIMDB, askTMDB, ...]
-> filterRedundantSearchResults (already-searched-state per query)
-> [fetchIMDB, fetchTMDB, ...]
-> consolidate                  (collected-meta-infos-state per query)
   => List[  TerminatorI-List[MetaInfo],  TerminatorII-List[MetaInfo],  ...]

これまでのところ、すべての Pipeline-Segment をアクターとして実装しました。filterXXX や consolidate などの一部のアクターは、クエリごとに状態を維持する必要があるため、クエリごとに専用のアクター インスタンスを作成する必要があります。

askIMDB のような関数は、同時に処理したい複数の結果を生成します (それぞれ別のアクターに対して)。したがって、 query() を実行するにアクターのグラフ全体を事前に構築する方法も、実行時にそれを変更するエレガントな方法も見つかりませんでした。

私の最初の試みは、アクターのチェーンと、メッセージ内のトランザクション ID のような sth を渡すことでした。そのため、各アクターには Map[TransactionID->State] がありましたが、これはかなり見苦しく感じました。2 番目の試みは、アクターのダイグラフを 1 つのフローに抽象化する一種のパイプラインを作成することでしたが、これまでのところ失敗しました。

これは私の最初の投稿です。何かを忘れていたり、質問が一般的/疑似コード化されていたりした場合は申し訳ありません。アドバイスをいただければ幸いです。ありがとう!

4

1 に答える 1

4

ほぼ同じことを行うScalaQueryをご覧になることをお勧めします。これはモナド問題なので、そうすることができます。実際、 Scalaz ライブラリによって実装されている Arrows などの一部の Haskell ソリューションは、かなり近いようです。

適切な抽象化により、将来の変更が容易になるため、これが最善の解決策です。

ハックとして、私は次のようなものを考え出します:

abstract class QueryModifiers
case object Consolidate extends QueryModifiers
// create others as appropriate

class Query(title: String) {
  self =>

  // Create actors
  def createActor(qm: QueryModifiers): Actor = {
    val actor = qm match {
      case Consolidate => // create a consolidator actor
      case //... as needed
    }
    actor.start
    actor
  }

  // The pipeline
  val pipe: List[List[QueryModifiers]] = Nil

  // Build the pipeline
  def ->(qms: List[QueryModifiers]) = new Query(title) {
    override val pipe = qms :: self.pipe
  }
  def ->(qm: QueryModifiers) = new Query(title) {
    override val pipe = List(qm) :: self.pipe
  }
  def ->(c: Consolidate.type) = {
    // Define the full pipeline
    // Because the way pipe is built, the last layer comes first, and the first comes last
    val pipeline = Consolidate :: pipe

    // Create an actor for every QueryModifier, using an unspecified createActor function
    val actors = pipeline map (_ map (createActor(_))

    // We have a list of lists of actors now, where the first element of the list
    // was the last QueryModifiers we received; so, group the layers by two, and for each
    // pair, make the second element send the result to the first.
    // Since each layer can contain many actors, make each member of the second
    // layer send the results to each member of the first layer.
    // The actors should be expecting to receive message SendResultsTo at any time.
    for {
      List(nextLayer, previousLayer) <- actors.iterator sliding 2
      nextActor <- nextLayer
      previousActor <- previousLayer
    } previousActor ! SendResultsTo(nextActor)

    // Send the query to the first layer
    for ( firstActor <- actors.last ) firstActor ! Query(title)

    // Get the result from the last layer, which is the consolidator
    val results = actors.head.head !? Results

    // Return the results
    results
  }
}

編集

ちょっとしたトリックで、注文を保証することもできます。ここでは Scala 2.8 を避けようとしていますが、名前付きパラメーターとデフォルト パラメーターを使用すると、はるかに簡単になります。

sealed abstract class QueryModifiers
case class QMSearcher(/*...*/) extends QueryModifiers
case class QMFilter(/*...*/) extends QueryModifiers
case class QMFetcher(/*...*/) extends QueryModifiers
case object Consolidate extends QueryModifiers

class Query[NextQM] private (title: String, searchers: List[QMSeacher], filters: List[QMFilter], fetchers: List[QMFetcher]) {

// Build the pipeline
  def ->[T <: NextQM](qms: List[NextQM])(implicit m: Manifest[T]) = m.toString match {
    case "QMSearch" => new Query[QMFilter](title, qms, Nil, Nil)
    case "QMFilter" => new Query[QMFetcher](title, seachers, qms, Nil)
    case "QMFetcher" => new Query[Consolidate.type](title, searches, filters, qms)
    case _ /* "Consolidate$", actually */ => error("List of consolidate unexpected")
  }
  // Do similarly for qm: NextQM

  // Consolidation
  def ->(qm: Consolidate.type) = {
     // Create Searchers actors
     // Send them the Filters
     // Send them Fetchers
     // Create the Consolidator actor
     // Send it to Searchers actors
     // Send Searchers the query
     // Ask Consolidator for answer
  }
}

object Query {
  def apply(title: String) = new Query[QMSearcher](title, Nil, Nil, Nil)
}

現在、サーチャー アクターはフィルターのリスト、フェッチャーのリスト、およびコンソリデーターへの参照を保持しています。彼らは、これらのことを知らせるメッセージとクエリを聞きます。結果ごとに、リスト内のすべてのフィルターに対してフィルター アクターを作成し、それぞれにフェッチャーとコンソリデーターのリストを送信してから、結果を送信します。

フィルター アクターは、フェッチャーのリストとコンソリデーターへの参照を保持します。彼らは、これらのことを知らせるメッセージに耳を傾け、検索の結果を求めます。出力がある場合は、新しく作成されたフェッチャー アクターに送信します。フェッチャー アクターは、コンソリデーターについて最初に通知されます。

フェッチャーはコンソリデーターへの参照を保持します。それらは、その参照を通知するメッセージと、フィルターからの結果をリッスンします。次に、結果をコンソリデーターに送信します。

コンソリデータは 2 つのメッセージをリッスンします。fetcher アクターからの 1 つのメッセージは、結果を通知し、結果を蓄積します。Query からの別のメッセージがその結果を要求し、それが返されます。

残された唯一のことは、すべての結果が処理されたことをコンソリデーターに知らせる方法を考案することです。1つの方法は次のとおりです。

  1. クエリで、作成されたすべてのサーチャーを Consolidator アクターに通知します。コンソリデーターはそれらのリストを保持し、それらが終了したかどうかを示すフラグを付けます。
  2. 各サーチャーは、作成したフィルターのリストを保持し、それらからの「完了」メッセージを待ちます。サーチャーが行う処理が残っておらず、すべてのフィルターから「完了」を受け取ると、サーチャーは完了したことを通知するメッセージをコンソリデーターに送信します。
  3. 次に、各フィルターは、作成したフェッチャーのリストを保持し、同様に、それらからの「完了」メッセージを待ちます。処理を終了し、すべてのフェッチャーから「完了」を受け取ると、サーチャーに処理が完了したことを通知します。
  4. It fetcher は、その作業が完了して consolidator に送信されると、それを作成したフィルターに「done」メッセージを送信します。
  5. コンソリデータは、すべての検索者から「完了」を受け取った後、結果を照会するメッセージのみをリッスンします。
于 2009-12-17T21:54:06.073 に答える