OAuth2 トークンを提供し、期限切れのトークンの更新も処理する Source を作成しようとしています。現在、私のコードは次のようになっています
case class Token(expires: Instant = Instant.now().plus(100, ChronoUnit.MILLIS)){
def expired = Instant.now().isAfter(expires)
}
Source
.repeat()
.mapAsync(1){ _ =>
println(" -> token req")
// this fakes an async token request to the token service
Future{
Thread.sleep(500)
println(" <- token resp")
Token()
}
}
.mapAsync(1){ token =>
println(" -> req with token auth")
if(token.expired){
println("!!! Received expired token")
}
// this is the actual call that needs the token
println("making call")
Future{
Thread.sleep(2000)
println(" <- req resp")
"OK"
}
}
.take(2)
.runWith(Sink.ignore)
.recover{case e => ()}
.flatMap{ _ =>
system.terminate()
}
このコードの出力は次のようになります
root -> token req
root <- token resp
root -> token req
root -> req with token auth
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root <- token resp
root -> token req
root <- token resp
root -> token req
root <- token resp
root <- req resp
root -> req with token auth
root !!! Received expired token
root making call
root ... finished with exit code 0
明らかに、この mapAsync(1) は予期しないときに需要を生み出しています (プリフェッチ?)
2 つの問題があります。
- 要求により、上流で不要なトークン要求が発生します
- トークンのプリフェッチ/キャッシュは、特定の時間のみ有効であるため問題があります。
では、この関数のように動作する真のプル ストリームを作成するにはどうすればよいでしょうか?
def tokenSource: () => Future[Token]