1

各ブロックに含まれるトランザクションを格納するための別の Cassandra テーブルを含めるようにアプリケーションを拡張しようとしています。

コード スニペットを簡潔で関連性のあるものに保つように努めました。さらにコード コンテキストが必要な場合は、お知らせください。

phantomVersion = "1.22.0" cassandraVersion = "2.1.4"


以下にリストされているコードで、次のコンパイル エラーが発生します。洞察力は大歓迎です。

[error] /home/dan/projects/open-blockchain/scanner/src/main/scala/org/dyne/danielsan/openblockchain/data/database/Database.scala:30: overloaded method value add with alternatives:
[error]   (batch: com.websudos.phantom.batch.BatchQuery[_])com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified] <and>
[error]   (queries: Iterator[com.websudos.phantom.builder.query.Batchable with com.websudos.phantom.builder.query.ExecutableStatement])(implicit session: com.datastax.driver.core.Session)com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified] <and>
[error]   (queries: com.websudos.phantom.builder.query.Batchable with com.websudos.phantom.builder.query.ExecutableStatement*)(implicit session: com.datastax.driver.core.Session)com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified] <and>
[error]   (query: com.websudos.phantom.builder.query.Batchable with com.websudos.phantom.builder.query.ExecutableStatement)(implicit session: com.datastax.driver.core.Session)com.websudos.phantom.batch.BatchQuery[com.websudos.phantom.builder.Unspecified]
[error]  cannot be applied to (scala.concurrent.Future[com.datastax.driver.core.ResultSet])
[error]       .add(ChainDatabase.bt.insertNewBlockTransaction(bt))
[error]        ^
[error] one error found
[error] (compile:compileIncremental) Compilation failed
[error] Total time: 6 s, completed Aug 9, 2016 2:42:30 PM

GenericBlockModel.scala:

case class BlockTransaction(hash: String, txid: String)

sealed class BlockTransactionModel extends CassandraTable[BlockTransactionModel, BlockTransaction] {

  override def fromRow(r: Row): BlockTransaction = {
    BlockTransaction(
      hash(r),
      txid(r)
    )
  }

  object hash extends StringColumn(this) with PartitionKey[String]

  object txid extends StringColumn(this) with ClusteringOrder[String] with Descending

}

abstract class ConcreteBlockTransactionModel extends BlockTransactionModel with RootConnector {

  override val tableName = "block_transactions"

  def insertNewBlockTransaction(bt: BlockTransaction): Future[ResultSet] = insertNewRecord(bt).future()

  def insertNewRecord(bt: BlockTransaction) = {
    insert
      .value(_.hash, bt.hash)
      .value(_.txid, bt.txid)
  }
}

データベース.scala

class Database(val keyspace: KeySpaceDef) extends DatabaseImpl(keyspace) {

  def insertBlock(block: Block) = {
    Batch.logged
      .add(ChainDatabase.block.insertNewRecord(block))
      .future()
  }

  def insertTransaction(tx: Transaction) = {
    Batch.logged
      .add(ChainDatabase.tx.insertNewTransaction(tx))
      .future()
  }

  def insertBlockTransaction(bt: BlockTransaction) = {
    Batch.logged
      .add(ChainDatabase.btx.insertNewBlockTransaction(bt))
      .future()
  }

  object block extends ConcreteBlocksModel with keyspace.Connector

  object tx extends ConcreteTransactionsModel with keyspace.Connector

  object btx extends ConcreteBlockTransactionsModel with keyspace.Connector


}

object ChainDatabase extends Database(Config.keySpaceDefinition) 
4

1 に答える 1

2

エラーは明らかに、 にクエリが必要なときに、Futureをに追加しようとしていることです。すでにクエリをトリガーしている場合、それをバッチ処理することはできません。そのため、1 ステップ前に停止する必要があります。方法は次のとおりです。BatchBatch

def insertNewRecord(
  bt: BlockTransaction
): InsertQuery.Default[BlockTransactionModel, BlockTransaction] = {
  insert
    .value(_.hash, bt.hash)
    .value(_.txid, bt.txid)
}

以下を使用して、複数のレコードをバッチに追加できるようになりました。

Batch.logged.add(insertNewRecord(record1)
 .add(insertNewRecord(record2))
 // etc

別の注記としてbatch、Cassandra の a は並列挿入を行うために使用されるのではなく、原子性を保証するために使用されるため、一般に通常の並列挿入よりも少なくとも 30% 遅くなります。詳しくはこちらをお読みください。

より多くのものを同時に挿入したい場合は、次のような未来を返すメソッドを使用できます。

def insertMany(
  list: List[BlockTransaction]
): Future[List[ResultSet]] = {
  Future.sequence(list.map(insertNewRecord(_).future()))
}
于 2016-08-09T06:37:52.687 に答える