2

OAuth2 トークンを提供し、期限切れのトークンの更新も処理する Source を作成しようとしています。現在、私のコードは次のようになっています

  case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
    def expired = Instant.now().isAfter(expires)
  }

  Source
    .repeat()
    .mapAsync(1){ _ =>
      println("  -> token req")
      // this fakes an async token request to the token service
      Future{
        Thread.sleep(500)
        println("  <- token resp")
        Token()
      }
    }
    .mapAsync(1){ token =>
      println("  -> req with token auth")
      if(token.expired){
        println("!!! Received expired token")
      }
      // this is the actual call that needs the token
      println("making call")
      Future{
        Thread.sleep(2000)
        println("  <- req resp")
        "OK"
      }
    }
    .take(2)
    .runWith(Sink.ignore)
    .recover{case e => ()}
    .flatMap{ _ =>
      system.terminate()
    }

このコードの出力は次のようになります

root   -> token req
root   <- token resp
root   -> token req
root   -> req with token auth
root making call
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- req resp
root   -> req with token auth
root !!! Received expired token
root making call
root   <- token resp
root   -> token req
root   <- token resp
root   -> token req
root   <- token resp
root   <- req resp
root   -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0

明らかに、この mapAsync(1) は予期しないときに需要を生み出しています (プリフェッチ?)

2 つの問題があります。

  • 要求により、上流で不要なトークン要求が発生します
  • トークンのプリフェッチ/キャッシュは、特定の時間のみ有効であるため問題があります。

では、この関数のように動作する真のプル ストリームを作成するにはどうすればよいでしょうか?

def tokenSource: () => Future[Token]

4

2 に答える 2

3

scala.collection.immutable.Streamプリフェッチとキューイングを意図的に回避しようとしている場合は、 akka Stream よりも Iterator の方が優れたソリューション だと思います。

以下は、質問で列挙した落とし穴を回避する実装例です。(注: を使用しActorSystemて、 を介して ExecutionContext を作成しdispatcher、呼び出しが完了する前にアプリケーションが終了するのを防ぎましたsleep。メイン関数が最後に到達したからといって ActorSystem がシャットダウンしないという事実を利用しています。式定義の。)

import scala.collection.immutable.Stream
import scala.concurrent.Future

object ScalaStreamTest extends App {    
  case class Token(expires: Long = System.currentTimeMillis() + 100){
    def expired = System.currentTimeMillis() > expires
  }

  val actorSystem = akka.actor.ActorSystem()      
  import actorSystem.dispatcher

  def createToken =  Future {
    Thread.sleep(500)
    println("  <- token resp")
    Token()
  }

  def checkExpiration(token : Future[Token]) = token map { t =>
    println("  -> req with token auth")
    if(t.expired){println("!!! Received expired token")}
    t
  }

  def makeCall(token : Future[Token]) = token flatMap { t =>
    println("making call")
    Future {
      Thread.sleep(2000)
      println("  <- req resp")
      "OK"
    }
  }

  val stream = Stream.continually(createToken)
                     .map(checkExpiration)
                     .map(makeCall)
                     .take(2)
                     .force
}//end object ScalaStreamTest

Stream は遅延しているforceため、呼び出しが必要です。したがって、強制の前のすべてのメソッド呼び出し (つまり、継続的に、マップ、およびテイク) も遅延しています。レデューサーが呼び出されるか、ストリームが強制的に明示的に指示されない限り、遅延ストリームで計算は行われません。

于 2015-10-30T16:38:23.607 に答える