0

これらの%が新しいイベントで更新され続けるように、ネットワークトラフィックに出現するプロトコルの%を継続的に計算したいと考えています。円グラフが生成され、パーセンテージで更新されます。計算には新しいデータと以前のデータの両方が必要なので、インメモリ テーブルを使用してイベントを長期間 (たとえば 1 日) 保持することにしました。

イベント テーブルはイベント ストリームと結合した場合にのみ使用できるため、古い値も取得するために外部結合を選択しました。プロトコルとそのパーセンテージだけに関心があるので、2 つの列だけが必要ですが、外部結合で集計関数を適用できません。これまでに生成したクエリは次のとおりです。

@Import('MAINInStream:1.0.0')
define stream MAINInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string, service string, duration double, orig_bytes long, resp_bytes long, conn_state string, local_orig bool, local_resp bool, missed_bytes long, history string, orig_pkts long, orig_ip_bytes long, resp_pkts long, resp_ip_bytes long, tunnel_parents string, sensorname string);

@Export('ProtocolStream:1.0.0')
define stream ProtocolStream (protocol string, count int);

define table mem_conn_table (timestamp long, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string);

from MAINInStream
select time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp, id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
insert into intermediateStream;

from MAINInStream
select time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp, id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
group by id_resp_p
insert into mem_conn_table;

from intermediateStream#window.externalTimeBatch(timestamp,1min, timestamp, 1min) as i right outer join mem_conn_table[time:dateDiff(time:currentTimestamp(),cast(timestamp,"string"), "yyyy-MM-dd HH:mm:ss", "yyyy-MM-dd HH:mm:ss") == 0] as mc
on  i.timestamp == mc.timestamp 
SELECT (ifThenElse(mc.id_resp_p == 21,'FTP', ifThenElse(mc.id_resp_p == 22,'SSH', ifThenElse(mc.id_resp_p == 25,'SMTP', ifThenElse(mc.id_resp_p == 445,'SMB','MYSQL')))))  as protocol , cast(count(mc.id_resp_p),'int') as count
insert into ProtocolStream;

外部の 1 分でウィンドウをバッチ処理してから、プロトコルとそのカウントを取得していますが、出力が得られません。

助言がありますか?

4

1 に答える 1

0

インメモリ テーブルでは外部結合を使用できません。必要に応じて、メモリ内テーブルに存在するイベントを中間ストリームに発行し、それを結合に使用できます (ガイド)。ただし、シナリオではexternalTime、イベント テーブルを使用する代わりにウィンドウを使用できます。以下のようなことを試してください。

@Import('MAINInStream:1.0.0')
define stream MAINInStream (ts string, uid string, id_orig_h string, id_orig_p int, id_resp_h string, id_resp_p int, proto string, service string, duration double, orig_bytes long, resp_bytes long, conn_state string, local_orig bool, local_resp bool, missed_bytes long, history string, orig_pkts long, orig_ip_bytes long, resp_pkts long, resp_ip_bytes long, tunnel_parents string, sensorname string);

@Export('ProtocolStream:1.0.0')
define stream ProtocolStream (protocol string, count long);

@Export('PercentageStream:1.0.0')
define stream PercentageStream (protocol string, count long, percentage double);


from MAINInStream
select 
    time:timestampInMilliseconds(time:dateAdd(str:replaceAll(ts,'T',' '), 5, 'hour',"yyyy-MM-dd HH:mm:ss"),'yyyy-MM-dd HH:mm') as timestamp, 
    (ifThenElse(mc.id_resp_p == 21,'FTP', ifThenElse(mc.id_resp_p == 22,'SSH', ifThenElse(mc.id_resp_p == 25,'SMTP', ifThenElse(mc.id_resp_p == 445,'SMB','MYSQL')))))  as protocol
    id_orig_h, id_orig_p, id_resp_h, id_resp_p, proto
insert into intermediateStream;

from intermediateStream#window.externalTime(timestamp, 1 day)
select timestamp, count() as totalCount
insert into totalCountStream;

from intermediateStream#window.externalTime(timestamp, 1 day)
select timestamp, protocol, count() as count
group by protocol
insert into perProtocolCountStream;

from perProtocolCountStream
select protocol, count
insert into ProtocolStream;

from totalCountStream#window.time(1 min) as tcs join perProtocolCountStream#window.time(1 min) as pcs
select pcs.protocol, pcs.count as count, ((pcs.count/tcs.totalCount)) * 100 as percentage
    on tcs.timestamp == pcs.timestamp
insert into PercentageStream;
于 2016-08-29T08:37:51.870 に答える