0

アプリケーションの負荷テストを行えるように、Scala Futures を使用して Amazon Kinesis に非同期でメッセージを書き込もうとしています。

このコードは機能し、データがパイプラインを下って移動し、出力がコンソールに表示されることを確認できます。

import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}    

object KinesisDummyDataProducer extends App {

  val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
  println("Connected")

  lazy val encoder = Charset.forName("UTF-8").newEncoder()
  lazy val tz = TimeZone.getTimeZone("UTC")
  lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
  df.setTimeZone(tz)

  (1 to args(0).toInt).map(int => send(int)).map(msg => println(msg))

  private def send(int: Int) = {
    val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
    val bytes = encoder.encode(CharBuffer.wrap(msg))
    encoder.flush(bytes)
    kinesis.putRecord("PrimaryEventStream", bytes, "123")
    msg
  }
}

このコードは Scala Futures で動作します。

import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global

def doIt(x: Int) = {Thread.sleep(1000); x + 1}
(1 to 10).map(x => future{doIt(x)}).map(y => y.onSuccess({case x => println(x)}))

構文はシーケンスのマッピングとほぼ同じであることに注意してください。ただし、次の方法は機能しません (つまり、コンソールに出力することも、パイプラインにデータを送信することもありません)。

import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global


object KinesisDummyDataProducer extends App {

  val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
  println("Connected")

  lazy val encoder = Charset.forName("UTF-8").newEncoder()
  lazy val tz = TimeZone.getTimeZone("UTC")
  lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
  df.setTimeZone(tz)

  (1 to args(0).toInt).map(int => future {send(int)}).map(f => f.onSuccess({case msg => println(msg)}))

  private def send(int: Int) = {
    val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
    val bytes = encoder.encode(CharBuffer.wrap(msg))
    encoder.flush(bytes)
    kinesis.putRecord("PrimaryEventStream", bytes, "123")
    msg
  }
}

このプロジェクトに関するその他のメモ。私は Maven を使用して (コマンド ラインから) ビルドを行っており、上記のすべてのコードを (コマンド ラインからも) 実行するとうまくいきます。

私の質問は次のとおりです。同じ構文を使用すると、関数「send」が実行されていないように見えるのはなぜですか?

4

0 に答える 0