3G ネットワーク(GPRS トンネリング プロトコル)でのデータ トラフィック ログの処理に Flink Stream を使用しています。また、ユーザーのユーザーセッションでの情報の統合に問題があります。
例: 1 つのセッションの開始と終了をマップする方法。そのような複雑なプロトコルを処理するのに適した Flink ストリーミングがあることを知りませんか?
p/s:
3G ネットワークで SGSN と GGSN の間で交換されるデータをキャプチャします (GTP-C/U メッセージで GTP プロトコルを使用します)。セッションは、SGSN がCreateReq (TEID、Seq、IMSI、TEID_dl、TEID_data_dl)メッセージを送信し、GGSNが CreateRsp(TEID_dl、Seq、TEID_ul、TEID_data_ul)メッセージを応答するときに開始されます。セッションが確立された後、SGSN から GGSN に送信されるその他の GTP-C メッセージ (例: UpdateReq、DeleteReq) は TEID_ul を使用し、応答メッセージは TEID_dl を使用し、GTP-U メッセージは TEID_data_ul (SGSN -> GGSN) および TEID_data_dl (GGSN -> SGSN) を使用します。 )。GTP-U メッセージには、AppID (facebook、twitter、web)、url などの情報が含まれています...
最後に、継続的なログ データ ストリームを処理し、GTP-C メッセージと同じ 1 人のユーザーの GTP-U (IMSI ) レポートを作成します。
私はこれを試しました:
val sessions = createReqs.connect(createRsps).flatMap(new CoFlatMapFunction[CreateReq, CreateRsp, Session] {
// holds CreateReqs indexed by (tedid_dl,seq)
private val createReqs = mutable.HashMap.empty[(String, String), CreateReq]
// holds CreateRsps indexed by (tedid,seq)
private val createRsps = mutable.HashMap.empty[(String, String), CreateRsp]
override def flatMap1(req: CreateReq, out: Collector[Session]): Unit = {
val key = (req.teid_dl, req.header.seqNum)
val oRsp = createRsps.get(key)
if (!oRsp.isEmpty) {
val rsp = oRsp.get
println("OK")
out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
createRsps.remove(key)
} else {
createReqs.put(key, req)
}
}
override def flatMap2(rsp: CreateRsp, out: Collector[Session]): Unit = {
val key = (rsp.header.teid, rsp.header.seqNum)
val oReq = createReqs.get(key)
if (!oReq.isEmpty) {
val req = oReq.get
out.collect(new Session(rsp.header.time, req.imsi, req.teid_dl, req.teid_ddl, rsp.teid_upl, rsp.teid_dupl, req.rat, req.apn))
createReqs.remove(key)
} else {
createRsps.put(key, rsp)
}
}
}).print()
このコードは常に空の結果を返します。入力ストリームに同じセッションの CreateRsp および CreateReq メッセージが含まれていること。それらは非常に接近して表示されます (1 秒以内)。デバッグすると、毎回 oReq.isEmpty == trueになります。私が間違っているのは何ですか?