プレリュード: FaunusGraph と TitanGraph の両方の Gremlin "方言" を数か月使用した経験があり、機能的および構文的な差分をよく認識しています。Faunus スクリプト ステップ ( http://architects.dzone.com/articles/distributed-graph-computing、https://github.com/thinkaurelius/faunus/blob/master/src/main/java/com/thinkaureliusを正常に使用した/faunus/mapreduce/sideeffect/ScriptMap.java ) を使用して、サブグラフの比較的単純な削除と変更を行います。
問題: 複雑なミューテーション スクリプト マップを実装して、プロパティの命名規則に従って、エッジ プロパティをアウト頂点またはイン頂点のいずれかに「移動」させました。私の TitanGraph Gremlin プロトタイプは小さなグラフで動作しますが、スケールアップした実装を動作させることができません: マップは正常に完了しますが、グラフは変更されません (変更をコミットしています)。注: Logger オブジェクトは、プレフィックス引数を表示する最初の INFO メッセージのみを出力しており、エッジ名前空間ガード条件を満たしていないことを示しています (条件なしで実行しましたが、変更はありません)。以下は私のコードです(内部ネットからのファットフィンガーリングなので、タイプミスが発生する可能性があります)
//faunus pipe driver - usage gremlin -e faunus.ns.set-props.grm
import java.io.Console
//get args
console=System.console()
arg=console.readLine('> type <namespace>;<faunus.ns.set-props.mapper_path>;<from_prefix>;<to_prefix>
inargs=arg.split(";")
//establish FaunusGraph connection
f=FaunusFactory.open('titan-client.properties')
f.getConf().set("faunus.graph.input.titan.storage.read-consistency-level", "ALL")
f.getConf().set("faunus.graph.input.titan.storage.write-consistency-level", "ALL")
//Faunus pipe incl. script step
f.V().has("_namespace", inargs[0]).script(inargs[1], inargs[2], inargs[3]
//script map - usage f.V().has("_namespace", <namespace_string>).script(<this_script_path>, <outV_key_prefix_string>, <inV_key_prefix_string>)
def g
def mylog
def setup(args) {
mylog=java.util.logging.Logger.getLogger("script_map")
println("configuring graph ...")
conf=new BaseConfiguration()
conf.setProperty("storage.backend", "cassandra")
conf.setProperty("storage.keyspace", "titan")
conf.setProperty("storage.index.index-name", "titan")
conf.setProperty("storage.hostname", "localhost")
g=TitanFactory.open(conf)
}
def map(v, args) {
mylog.info("*****READ***** args: "+args[0].toString()+", "+args[1].toString())
//fetch all edges incident on Titan vertex corresponding to incoming Faunus vertex
gv=g.v(v.id)
edges=gv.bothE();null
//iterate through incident edges
while(edges.hasNext()) {
e=edges.next()
if (e.hasProperty("_namespace")) { //_namespace removed from previously processed edges
/*fetch terminal vertices of current edge, add incidence & adjacency props
to support metrics and analytics
*/
from=e.getVertex(OUT)
from.setProperty("inV_degree", from.in().count())
from.setProperty("inE_degree", from.inE().count())
from.setProperty("outV_degree" from.out().count())
from.setProperty("outE_degree", from.outE().count())
to=e.getVertex(IN)
to.setProperty("inV_degree", from.in().count())
to.setProperty("inE_degree", from.inE().count())
to.setProperty("outV_degree" from.out().count())
to.setProperty("outE_degree", from.outE().count())
mylog.info("*****READ*****edge id: "+e.id)
mylog.info("*****READ*****edge vertices: from id"+fromid+"; to id: "+to.id)
//fetch property keys of current edge
ekeys=e.getPropertyKeys()
//iterate through edge property keys
for(String ekey:ekeys)
eprop=e.getProperty(ekey) //get value of current property key
goodprop=!(eprop == "" || eprop == null)
mylog.info("*****READ*****edge key/value: "+ekey+"="eprop)
/*determine placement of current key/value on one or neither of the
terminal vertices based on key prefix arges and property value,
remove prefix from re-assigned key/value
*/
if(ekey.startsWith(args[0]) && goodprop) {
vkey=ekey.split(args[0])[1]
if(!from.hasProperty(vkey)) from.setProperty(vkey, eprop)
else {
vprop=from.getProperty(vkey)
if(!vprop.equal(eprop) from.setProperty(vkey, vprop+";"+eprop)
}
mylog.info("*****READ*****from vertex key/value: "+vkey+"="+from.getProperty(vkey)
}
else if(ekey.startsWith(args[1]) && goodprop) {
vkey=ekey.split(args[1])[1]
if(!to.hasProperty(vkey)) to.setProperty(vkey, eprop)
else {
vprop=to.getProperty(vkey)
if(!vprop.equal(eprop) to.setProperty(vkey, vprop+";"+eprop)
}
mylog.info("*****READ*****tovertex key/value: "+vkey+"="+to.getProperty(vkey)
}
//if current edge property key is re-assigned, remove it from the edge
if(ekey.startsWith(args[0]) || ekey.startsWith(args[1])) {
e.removeProperty(ekey)
if(e.hasProperty(ekey) println(ekey+" NOT remvoded from edge")
else println(ekey+ "removed from edge")
}
e.removeProperty("_namespace") // marks edge as processed per outer loop guard
}
}
}
g.commit()
}
def cleanup(args) {
g.shutdown()
}