0

次のように、ReactiveStreams Publisher に基づいて Akka Stream の Source を作成しました。

object FlickrSource {

  val apiKey = Play.current.configuration.getString("flickr.apikey")
  val flickrUserId = Play.current.configuration.getString("flickr.userId")
  val flickrPhotoSearchUrl = s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&page=%s&per_page=500"

  def byDate(date: LocalDate): Source[JsValue, Unit] = {
    Source(new FlickrPhotoSearchPublisher(date))
  }
}

class FlickrPhotoSearchPublisher(date: LocalDate) extends Publisher[JsValue] {

  override def subscribe(subscriber: Subscriber[_ >: JsValue]) {
    try {
      val from = new LocalDate()
      val fromSeconds = from.toDateTimeAtStartOfDay.getMillis
      val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis

      def pageGet(page: Int): Unit = {
        val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds, page)
        Logger.debug("Flickr search request: " + url)
        val photosFound = WS.url(url).get().map { response =>
          val json = response.json
          val photosThisPage = (json \ "photos" \ "photo").as[JsArray]
          val numPages = (json \ "photos" \ "pages").as[JsNumber].value.toInt
          Logger.debug(s"pages: $numPages")
          Logger.debug(s"photos this page: ${photosThisPage.value.size}")
          photosThisPage.value.foreach { photo =>
            Logger.debug(s"onNext")
            subscriber.onNext(photo)
          }

          if (numPages > page) {
            Logger.debug("nextPage")
            pageGet(page + 1)
          } else {
            Logger.debug("onComplete")
            subscriber.onComplete()
          }
        }
      }
      pageGet(1)
    } catch {
      case ex: Exception => {
        subscriber.onError(ex)
      }
    }
  }
}

Flickr に検索要求を行い、結果をJsValues としてソースします。さまざまなフローとシンクに接続しようとしましたが、これが最も基本的なセットアップになります。

val source: Source[JsValue, Unit] = FlickrSource.byDate(date)
val sink: Sink[JsValue, Future[Unit]] = Sink.foreach(println)
val stream = source.toMat(sink)(Keep.right)
stream.run()

onNext数回呼び出され、次にonComplete. ただし、シンクは何も受け取りません。これはソースを作成する有効な方法ではありませんか?

4

1 に答える 1

0

Publisherのような単純なインターフェースObservableで、自分で実装できると勘違いしていました。Akka チームは、これは Publisher を実装する正しい方法ではないことを指摘しました。実際、Publisher は複雑なクラスであり、エンド ユーザーではなくライブラリによって実装されることになっています。質問で使用されているこのSource.apply(Publisher)メソッドは、他の Reactive Streams 実装との相互運用性のために存在します。

Source の実装が必要な目的は、バックプレッシャされたソースが Flickr (リクエストごとに最大 500 である) から検索結果をフェッチするようにすることであり、ダウンストリームで必要とされる以上の (またはより高速な) リクエストを行いたくないということです。これはActorPublisherを実装することで実現できます。

アップデート

これは、私が望むことを行う ActorPublisher です。検索結果を生成する Source を作成しますが、下流で必要な数の REST 呼び出しのみを行います。まだまだ改善の余地があると思いますので、ご自由に編集してください。

import akka.actor.Props
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import org.joda.time.LocalDate
import play.api.Play.current
import play.api.libs.json.{JsArray, JsNumber, JsValue}
import play.api.libs.ws.WS
import play.api.{Logger, Play}

import scala.concurrent.ExecutionContext.Implicits.global

object FlickrSearchActorPublisher {
  val apiKey = Play.current.configuration.getString("flickr.apikey")
  val flickrUserId = Play.current.configuration.getString("flickr.userId")
  val flickrPhotoSearchUrl = s"https://api.flickr.com/services/rest/?method=flickr.photos.search&api_key=$apiKey&user_id=$flickrUserId&min_taken_date=%s&max_taken_date=%s&format=json&nojsoncallback=1&per_page=500&page="

  def byDate(from: LocalDate): Props = {
    val fromSeconds = from.toDateTimeAtStartOfDay.getMillis / 1000
    val toSeconds = from.plusDays(1).toDateTimeAtStartOfDay.getMillis / 1000
    val url = flickrPhotoSearchUrl format (fromSeconds, toSeconds)

    Props(new FlickrSearchActorPublisher(url))
  }
}

class FlickrSearchActorPublisher(url: String) extends ActorPublisher[JsValue] {

  var currentPage = 1
  var numPages = 1
  var photos = Seq[JsValue]()

  def searching: Receive = {
    case Request(count) =>
      Logger.debug(s"Received Request for $count results from Subscriber, ignoring as we are still searching")
    case Cancel =>
      Logger.info("Cancel Message Received, stopping")
      context.stop(self)
    case _ =>
  }

  def accepting: Receive = {
    case Request(count) =>
      Logger.debug(s"Received Request for $count results from Subscriber")
      sendSearchResults()
    case Cancel =>
      Logger.info("Cancel Message Received, stopping")
      context.stop(self)
    case _ =>
  }

  def getNextPageOrStop() {
    if (currentPage > numPages) {
      Logger.debug("No more pages, stopping")
      onCompleteThenStop()
    } else {
      val pageUrl = url + currentPage
      Logger.debug("Flickr search request: " + pageUrl)
      context.become(searching)
      WS.url(pageUrl).get().map { response =>
        val json = response.json
        val photosThisPage = (json \ "photos" \ "photo").as[JsArray]
        numPages = (json \ "photos" \ "pages").as[JsNumber].value.toInt
        Logger.debug(s"page $currentPage of $numPages")
        Logger.debug(s"photos this page: ${photosThisPage.value.size}")
        photos = photosThisPage.value.seq
        if (photos.isEmpty) {
          Logger.debug("No photos found, stopping")
          onCompleteThenStop()
        } else {
          currentPage = currentPage + 1
          sendSearchResults()
          context.become(accepting)
        }
      }
    }
  }

  def sendSearchResults() {
    if (photos.isEmpty) {
      getNextPageOrStop()
    } else {
      while(isActive && totalDemand > 0) {
        onNext(photos.head)
        photos = photos.tail
        if (photos.isEmpty) {
          getNextPageOrStop()
        }
      }
    }
  }

  getNextPageOrStop()
  val receive = searching
}
于 2015-09-01T07:54:29.450 に答える