アプリケーションの負荷テストを行えるように、Scala Futures を使用して Amazon Kinesis に非同期でメッセージを書き込もうとしています。
このコードは機能し、データがパイプラインを下って移動し、出力がコンソールに表示されることを確認できます。
import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}
object KinesisDummyDataProducer extends App {
val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
println("Connected")
lazy val encoder = Charset.forName("UTF-8").newEncoder()
lazy val tz = TimeZone.getTimeZone("UTC")
lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
df.setTimeZone(tz)
(1 to args(0).toInt).map(int => send(int)).map(msg => println(msg))
private def send(int: Int) = {
val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
val bytes = encoder.encode(CharBuffer.wrap(msg))
encoder.flush(bytes)
kinesis.putRecord("PrimaryEventStream", bytes, "123")
msg
}
}
このコードは Scala Futures で動作します。
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global
def doIt(x: Int) = {Thread.sleep(1000); x + 1}
(1 to 10).map(x => future{doIt(x)}).map(y => y.onSuccess({case x => println(x)}))
構文はシーケンスのマッピングとほぼ同じであることに注意してください。ただし、次の方法は機能しません (つまり、コンソールに出力することも、パイプラインにデータを送信することもありません)。
import com.amazonaws.services.kinesis.AmazonKinesisClient
import java.nio.CharBuffer
import java.nio.charset.Charset
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone}
import scala.concurrent.future
import scala.concurrent.ExecutionContext.Implicits.global
object KinesisDummyDataProducer extends App {
val kinesis = new AmazonKinesisClient(PipelineConfig.awsCredentials)
println("Connected")
lazy val encoder = Charset.forName("UTF-8").newEncoder()
lazy val tz = TimeZone.getTimeZone("UTC")
lazy val df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'Z")
df.setTimeZone(tz)
(1 to args(0).toInt).map(int => future {send(int)}).map(f => f.onSuccess({case msg => println(msg)}))
private def send(int: Int) = {
val msg = "{\"event_name\":\"test\",\"timestamp\":\"%s\",\"int\":%s}".format(df.format(new Date()), int.toString)
val bytes = encoder.encode(CharBuffer.wrap(msg))
encoder.flush(bytes)
kinesis.putRecord("PrimaryEventStream", bytes, "123")
msg
}
}
このプロジェクトに関するその他のメモ。私は Maven を使用して (コマンド ラインから) ビルドを行っており、上記のすべてのコードを (コマンド ラインからも) 実行するとうまくいきます。
私の質問は次のとおりです。同じ構文を使用すると、関数「send」が実行されていないように見えるのはなぜですか?