1

いくつかの先物を連鎖させるためのエレガントな解決策を見つけるのに苦労しています。実装しようとしているメソッドは次のようになります (キャッシュの一部です)。

def acquire(key: A, producer: => Future[B]): Future[B]

そして、アルゴリズムはおおよそ次のとおりです。

  • キーがロックされている場合は、実行時例外を通じてすぐに (Future.failed を使用することの進歩はありますか?)
  • future { getOrRefresh }それ以外の場合は、キーを取得するのに時間がかかるため、ブロックを開きます
  • getOrRefreshどちらかがストレートを返し、Bそれが先物とメソッドの結果です
  • またはそれを実行する必要がありますproducer

最後のステップは、未来の中から未来を「平坦化」する必要があることを意味します。つまり、 はできないので、outer.flatMapを使うのが戦略だと思いますAwait

現在、メソッドを使用して を取得するか、 を使用して巻き戻したをAwait取得できるという点で、奇妙な統合失調症があります。問題は、 の場合、外側の未来を完了する前にロックを解除する必要があるため、 に固執する必要があるということです。Option[Try[B]]readyBresultFailureAwait.ready

これは醜くなります:

val fut   = producer
val prod  = Await.ready(fut, Duration.Inf).value.get
if (prod.isFailure) sync.synchronized { locked = false }
prod.get

これはそんなに醜いことができますか?これを行うためのより良い方法があるはずです。


繰り返しになりますが、Future[B]実行中から、 で完了するいくつかのピア Future を実行しB、ピアの結果を返しますが、失敗した場合は、メインの Future を完了する前にロックをクリーンアップします。

4

2 に答える 2

0

recover(With)ここでは、 を使用して回避しようとしてAwaitいます。それでも、例外を再スローする必要があるため、不格好に見えます

import concurrent._

trait Cache[A, B] {
  class Entry(var locked: Boolean = true)

  private var map  = Map.empty[A, Entry]
  private val sync = new AnyRef

  implicit def exec: ExecutionContext

  def readEntry(key: A): Option[B]

  def acquire(key: A, producer: => Future[B]): Future[B] = sync.synchronized {
    map.get(key) match {
      case Some(e) =>
        if (e.locked) throw new IllegalStateException()
        e.locked     = true
        val existing = future { readEntry(key).get }
        val refresh  = existing.recoverWith {
          case _: NoSuchElementException => producer
        }
        refresh.recover {
          case t => sync.synchronized(e.locked = false); throw t
        }

      case _ => producer.map { value =>
        sync.synchronized {
          map += key -> new Entry()
        }
        value
      }
    }
  }
}

提案がある場合は、別の回答として投稿してください。

于 2013-04-02T23:09:49.080 に答える
0

実装にいくつかの変更を加えました。

最初に、あなたのイニシャルthrowon locked を に変えましたFuture.failed。なぜなら、future の消費者は、注意すべき唯一の失敗は失敗した future であると安全に想定できるはずだからです。

get次に、Option[B]fromを呼び出すのではなくreadEntry、future の結果として返します。次に、 (マップは Future[Future[B]] になります)の場合にflatMap生成される未来を置き換えることができるように、結果を取得します。の場合は、値からa を返します。これは、future を返す必要があるためです。producerNoneSomeFuture.successfulflatMap

recover最後に、 andthrowを anに置き換えました。これはandThen、failedFutureを伝播させず、エントリのロックを解除するために副作用を未来に連鎖させたいだけだからです。

trait Cache[A, B] {
  class Entry(var locked: Boolean = true)

  private var map  = Map.empty[A, Entry]
  private val sync = new AnyRef

  def readEntry(key: A): Option[B] = ???

  def acquire(key: A, producer: => Future[B]): Future[B] = sync.synchronized {
    map.get(key) match {
      case Some(e) =>
        if (e.locked)
          Future.failed(new IllegalStateException())
        else {
        e.locked = true
        future { readEntry(key)}.flatMap {
          case None => producer.andThen {
            case Failure(_) => sync.synchronized(e.locked = false)
          }
          case Some(value) => Future.successful(value)
        }
      }

      case _ => producer.map { value =>
        sync.synchronized {
          map += key -> new Entry()
        }
        value
      }
    }
  }
}
于 2015-04-23T20:55:17.930 に答える