5

Akka Futures 内で tinkerpop を使用できるかどうかを知りたいのですが、これまでのところ、変更をグラフにコミットしても保持されません。tinkerpop はスレッド ローカル ライブラリであることを理解しています。つまり、将来的にスレッドを再度設定する必要があるということです。ODatabaseRecordThreadLocal.INSTANCE.set(thread)

成功せずに次の方法を試しました:

def test[T](graphChanges: => T): T = {
    val thread = ODatabaseRecordThreadLocal.INSTANCE.get
    try graphChanges finally {
      ODatabaseRecordThreadLocal.INSTANCE.set(thread)
      GraphPool.get("partitioned").commit
    }
}

// collect tinkerpop frames
test {
  future {
  // add changes to my tinkerpop frames
  }
}

play.mvc.Http.Context ごとに Tinkerpop スレッドを使用したいと思います

これが私が達成したいサンプルプロジェクトです: https://github.com/D-Roch/tinkerpop-play

4

2 に答える 2

7

問題

問題は、Tinkerpop がスレッド ローカルで動作することです。したがって、変更は現在のスレッドにのみコミットされます。Scala フューチャを作成するときは、フューチャが実行されるスレッドを環境に選択させます。そして、環境はそれをよく知らないので、間違ったスレッドを選択します。

この問題は Akka 先物でも同様です。

未来はどのスレッドで実行されますか?

Future を作成するときは、次の 2 つのパラメーターを使用して作成します。

  1. 実行するブロック
  2. ブロックを実行する実行コンテキスト

2 番目のパラメーターは通常、暗黙的なパラメーターとして指定されます。ただし、デフォルトをオーバーライドできます。

解決

Tinkerpop を扱う先物を作成するときは、同じスレッドですべてのブロックを実行する実行コンテキストを使用します。

例:

import scala.concurrent.ExecutionContext
import java.util.concurrent.Executors

implicit val ec=ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor)

future { 
    println(Thread.currentThread); 
    future {
        println(Thread.currentThread)
    }  
}

このコードは、同じスレッド ID をコンソールに 2 回出力します (Java 7 および Scala 2.10.2 でテスト済み)。

重要:このような小さなスレッド プールを使用すると、簡単にデッド ロックやスタベーションが発生する可能性があります。Tinkerpop とのやり取りにのみ使用してください。

ブロックをパラメーターとして取り、tinkerpop スレッドで実行されるフューチャーを返す特別なメソッド tinkerpopFuture を提供することができます。または、すべての tinkerpop インタラクションをカプセル化する特別なアクターを作成できます (そして、特別な tinkerpop 実行コンテキストでそれらを実行します)。

文学

于 2014-01-08T09:03:36.830 に答える
4

これは、Tinkerpop に固有のもののようには見えません。Future を使用する際によくあるエラーのようです。このフラグメントを検討してください:

try graphChanges finally { ... }

graphChangesそれだけでもいいのですが、ここに未来をつくっていることもわかります。そう...

  • graphChangesFuture を開始し、すぐに戻ります
  • tryブロックが完了し、ブロックfinallyが実行されます
  • これの直前、または直後、またはおそらく並行して、しかしほぼ確実に別のスレッドで、Futureが実行されます。

私のアドバイスは、非同期ロジックを 内に移動してtest、正しいスレッド アフィニティを確認し、すべての呼び出しが として正しくフラグ付けされるようにすることblockingです。このような:

def test[T](graphChanges: => T): Future[T] = future {
  blocking {
    val tlocal = ODatabaseRecordThreadLocal.INSTANCE
    val dbrecord = tlocal.get

    try graphChanges finally {
      tlocal.set(dbrecord)
      GraphPool.get("partitioned").commit
    }
  }
}

// collect tinkerpop frames
test {
  // add changes to my tinkerpop frames
}
于 2014-01-02T17:29:25.750 に答える