2

akka I/O の使用方法を学習しながら、akka I/O の上に単純なプロトコルを実装しようとしており、こちらのドキュメントに従っていました。

ただし、私のgradleファイルでは、以下に示すようにバージョン2.3.9を使用しています

dependencies {
    compile group: 'org.slf4j', name: 'slf4j-log4j12', version: '1.7.7'
    compile group: 'com.typesafe.akka', name: 'akka-actor_2.11', version: '2.3.9'
    compile group: 'com.typesafe.akka', name: 'akka-contrib_2.11', version: '2.3.9'
    compile group: 'org.scala-lang', name: 'scala-library', version: '2.11.5'
    testCompile group: 'junit', name: 'junit', version: '4.11'
}

パイプライン固有のいくつかのもののインポート

import akka.io.SymmetricPipelineStage;
import akka.io.PipelineContext;
import akka.io.SymmetricPipePair;

generate はシンボル エラーを解決できません。

したがって、私の質問。

  1. これらが削除されたか、gradle ファイルに追加する必要がある依存関係があります。
  2. それらが削除された場合、エンコード/デコード段階はどのように処理されますか?
4

2 に答える 2

6

パイプラインは実験的なもので、実際には Akka 2.3 で削除されました。削除については、移行ガイド 2.2.x から 2.3.xに記載されています。

「古い」パイプライン実装を Akka 2.3 hereでパッケージ化できることについても言及されていますが、依存関係の単純な追加ではないようです。

Akka Streams は、Akka 2.4 で提供されるパイプラインのより優れた代替品となることが意図されていますが、実験的なモジュールとして現在利用可能であることに賭けます。エンコード/デコード ステージまたはプロトコル レイヤーは、Akka Streams を Akka I/O と組み合わせて使用​​することで処理できます。

于 2015-03-28T17:24:05.287 に答える
0

はい、パイプラインは代替手段なしで削除されました。私は Netty の世界から来ましたが、パイプラインが「直感的でない」とは思いません。パイプラインはバッファを蓄積し、すぐに使用できるメッセージを子アクターに提供します。

私たちのソリューションを見てください"org.scalaz" %% "scalaz-core" % 7.2.14。依存関係として必要です。

Codec クラスはState、アクターによって呼び出されて出力を生成するモナドです。私たちのプロジェクトでは を使用しているVarint32 protobuf encodingため、すべてのメッセージの前にvarint32長さフィールドが追加されます。

import com.google.protobuf.CodedInputStream
import com.trueaccord.scalapb.{GeneratedMessage, GeneratedMessageCompanion, Message}
import com.zeptolab.tlc.front.codecs.Varint32ProtoCodec.ProtoMessage

import scalaz.{-\/, State, \/, \/-}

trait Accumulator
trait Codec[IN, OUT] {

  type Stream = State[Accumulator, Seq[IN]]

  def decode(buffer: Array[Byte]): Throwable \/ IN

  def encode(message: OUT): Array[Byte]

  def emptyAcc: Accumulator

  def decodeStream(data: Array[Byte]): Stream

}

object Varint32ProtoCodec {

  type ProtoMessage[T] = GeneratedMessage with Message[T]

  def apply[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) = new Varint32ProtoCodec[IN, OUT](protoType)

}

class Varint32ProtoCodec[IN <: ProtoMessage[IN], OUT <: ProtoMessage[OUT]](protoType: GeneratedMessageCompanion[IN]) extends Codec[IN, OUT] {

  import com.google.protobuf.CodedOutputStream

  private case class AccumulatorImpl(expected: Int = -1, buffer: Array[Byte] = Array.empty) extends Accumulator

  override def emptyAcc: Accumulator = AccumulatorImpl()

  override def decode(buffer: Array[Byte]): Throwable \/ IN = {
    \/.fromTryCatchNonFatal {
      val dataLength = CodedInputStream.newInstance(buffer).readRawVarint32()
      val bufferLength = buffer.length
      val dataBuffer = buffer.drop(bufferLength - dataLength)
      protoType.parseFrom(dataBuffer)
    }
  }

  override def encode(message: OUT): Array[Byte] = {
    val messageBuf = message.toByteArray
    val messageBufLength = messageBuf.length
    val prependLength = CodedOutputStream.computeUInt32SizeNoTag(messageBufLength)
    val prependLengthBuffer = new Array[Byte](prependLength)
    CodedOutputStream.newInstance(prependLengthBuffer).writeUInt32NoTag(messageBufLength)
    prependLengthBuffer ++ messageBuf
  }

  override def decodeStream(data: Array[Byte]): Stream = State {
    case acc: AccumulatorImpl =>
      if (data.isEmpty) {
        (acc, Seq.empty)
      } else {
        val accBuffer = acc.buffer ++ data
        val accExpected = readExpectedLength(accBuffer, acc)
        if (accBuffer.length >= accExpected) {
          val (frameBuffer, restBuffer) = accBuffer.splitAt(accExpected)
          val output = decode(frameBuffer) match {
            case \/-(proto) => Seq(proto)
            case -\/(_) => Seq.empty
          }
          val (newAcc, recOutput) = decodeStream(restBuffer).run(emptyAcc)
          (newAcc, output ++ recOutput)
        } else (AccumulatorImpl(accExpected, accBuffer), Seq.empty)
      }
    case _ => (emptyAcc, Seq.empty)
  }

  private def readExpectedLength(data: Array[Byte], acc: AccumulatorImpl) = {
    if (acc.expected == -1 && data.length >= 1) {
      \/.fromTryCatchNonFatal {
        val is = CodedInputStream.newInstance(data)
        val dataLength = is.readRawVarint32()
        val tagLength = is.getTotalBytesRead
        dataLength + tagLength
      }.getOrElse(acc.expected)
    } else acc.expected
  }

}

アクターは次のとおりです。

import akka.actor.{Actor, ActorRef, Props}
import akka.event.Logging
import akka.util.ByteString
import com.zeptolab.tlc.front.codecs.{Accumulator, Varint32ProtoCodec}
import com.zeptolab.tlc.proto.protocol.{Downstream, Upstream}

object FrameCodec {
  def props() = Props[FrameCodec]
}

class FrameCodec extends Actor {

  import akka.io.Tcp._

  private val logger       = Logging(context.system, this)
  private val codec        = Varint32ProtoCodec[Upstream, Downstream](Upstream)
  private val sessionActor = context.actorOf(Session.props())

  def receive = {
    case r: Received =>
      context become stream(sender(), codec.emptyAcc)
      self ! r
    case PeerClosed => peerClosed()
  }

  private def stream(ioActor: ActorRef, acc: Accumulator): Receive = {
    case Received(data) =>
      val (next, output) = codec.decodeStream(data.toArray).run(acc)
      output.foreach { up =>
        sessionActor ! up
      }
      context become stream(ioActor, next)
    case d: Downstream =>
      val buffer = codec.encode(d)
      ioActor ! Write(ByteString(buffer))
    case PeerClosed => peerClosed()
  }

  private def peerClosed() = {
    logger.info("Connection closed")
    context stop self
  }

}
于 2017-08-02T09:55:11.770 に答える