Akka Streams と Alpakka を使用して、Amazon SQS から読み取り、Elasticsearch でイベントのインデックスを作成するコードをいくつか作成しました。すべてがスムーズに機能し、パフォーマンスは素晴らしいのですが、インデックス名に問題があります。私はこのコードを持っています:
class ElasticSearchIndexFlow(restClient: RestClient) {
private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)
def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
restClient,
DomainEventMarshaller.domainEventWrites
)
private def index = {
val now = DateTime.now()
s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
}
}
問題は、フローを数日実行した後、インデックス名が変更されないことです。Akka Streams は内部で融合アクターを作成しindex
、インデックス名を取得する関数は実行の開始時にのみ評価されると想像します。
現在の日付に従ってインデックス名を使用して ES のイベントにインデックスを付けるにはどうすればよいでしょうか?