[technology]NetflowのデータをJubatusのアノマリ検知にかけてみる
弊ブログの下記の記事でOpenWRTにsoftflowdを仕掛け、nfcapdとnfdumpでnetflowデータを取得してみました。
rc30-popo.hatenablog.com
このデータを使って、Jubatusのアノマリ検知に掛けてみます。
- Jubatusの設定
config.jsonを下記の通りでJubatusの公式チュートリアルそのままです。{ "method" : "lof", "parameter" : { "nearest_neighbor_num" : 10, "reverse_nearest_neighbor_num" : 30, "method" : "euclid_lsh", "parameter" : { "hash_num" : 8, "table_num" : 16, "probe_num" : 64, "bin_width" : 10, "seed" : 1234 } }, "converter" : { "string_filter_types": {}, "string_filter_rules": [], "num_filter_types": {}, "num_filter_rules": [], "string_types": {}, "string_rules": [{"key":"*", "type":"str", "global_weight" : "bin", "sample_weight" : "bin"}], "num_types": {}, "num_rules": [{"key" : "*", "type" : "num"}] } }
- netflowデータの取得
前回の記事にも書きましたが、自宅でProxyサーバーとして使用しているOpenWrtマシンでsoftflowdを動かして数時間データを取りました。
取ったデータは下記の様な感じです。Date first seen Duration Proto Src IP Addr:Port Dst IP Addr:Port Flags Tos Packets Bytes pps bps Bpp Flows 2019-04-13 23:43:20.727 0.000 TCP 192.168.11.13:36852 -> 192.168.11.254:8118 ...A.R.. 0 1 52 0 0 52 1 2019-04-13 23:43:21.803 0.136 TCP 192.168.11.13:36914 -> 192.168.11.254:8118 ...APRSF 0 7 594 51 34941 84 1 2019-04-13 23:43:21.803 0.136 TCP 192.168.11.254:8118 -> 192.168.11.13:36914 ...AP.SF 0 5 541 36 31823 108 1 2019-04-13 23:43:21.798 0.823 TCP 192.168.11.13:36882 -> 192.168.11.254:8118 ...APRS. 0 10 1387 12 13482 138 1
nfcapd(netflow collector)で取得したデータをnfdumpを-o extendedオプション付でヒューマンリーダブルなテキスト情報に変換したものです。
約2万9千フローくらい集めました。
このデータなんですが、スペース区切りのCSVではなくて、固定長フィールドのデータになっています。また上記のサンプルでは判りませんが、転送パケット数やpps,bps等のデータは100万を超えると'16 M'の様にMとかGで表示されます。数値のMやGの間にはスペースが一個入っています。
Jubatusに食わせる時はこの辺を考慮してデータを加工する必要があります。(下記のスクリプトではG表示は考慮していません)
- pythonスクリプトソース
Jubatusに上記データを食わせて学習させるためのpythonスクリプトソースです。#!/usr/bin/env python # -*- coding: utf-8 -*- # Anomaly detection test for netflow data import signal import sys from jubatus.anomaly import client from jubatus.common import Datum import datetime host = '172.17.0.2' port = 9199 name = 'test' SAVEID = 'nfdata' NEW = True TESTMODE = False MEGA=1000000.0 GUARD_IP = '192.168.11.254' USE_DATETIME = False NFFILE = 'nfdata_learn.txt' def do_exit(sig, stack): print('You pressed Ctrl+C.') print('Stop running the job.') sys.exit(0) if __name__ == '__main__': signal.signal(signal.SIGINT, do_exit) # set jubatus server anom = client.Anomaly(host, port, name) if NEW == True and TESTMODE == False: # clear current data in Jubatus server anom.clear() else: # load last learned data anom.load(SAVEID) with open(NFFILE, mode='r') as file: for line in file: if line.startswith('Date'): pass elif line.startswith('Summary'): break else: # dt,tm,duration,proto,src,arrow,dst,flags,tos,packets,bt,pps,bps,bpp,flows = line[:-1].split() # srcip,srcport = src.split(':') # dstip,dstport = dst.split(':') datum = Datum() dt = line[0:10].strip() tm = line[11:23].strip() # Convert date & time to UNIX epoch dtobj = datetime.datetime.strptime(dt + ' ' + tm + '000','%Y-%m-%d %H:%M:%S.%f') dtepoch = dtobj.timestamp() duration = line[24:33].strip() proto = line[34:39].strip() srcip = line[40:56].strip() srcport = line[57:62].strip() dstip = line[66:82].strip() dstport = line[83:88].strip() flags = line[89:97].strip() tos = line[98:102].strip() packets = line[103:111].strip() bt = line[112:120].strip() pps = line[121:129].strip() bps = line[130:138].strip() bpp = line[139:145].strip() flows = line[146:151].strip() # print(dt,tm,duration,proto,srcip,srcport,dstip,dstport,flags,tos,packets,bt,pps,bps,flows)a if dstip == GUARD_IP: if USE_DATETIME: datum.add_number('datetime',dtepoch) datum.add_number('duration',float(duration)) datum.add_string('proto',proto) datum.add_string('srcip',srcip) # datum.add_string('srcport',srcport) # datum.add_string('dstip',dstip) datum.add_string('dstport',dstport) # datum.add_string('flags',flags) # Decode flags into indivisual TCP flag if 'A' in flags: datum.add_number('ack',1.0) else: datum.add_number('ack',0.0) if 'S' in flags: datum.add_number('syn',1.0) else: datum.add_number('syn',0.0) if 'F' in flags: datum.add_number('fin',1.0) else: datum.add_number('fin',0.0) if 'R' in flags: datum.add_number('rst',1.0) else: datum.add_number('rst',0.0) if 'P' in flags: datum.add_number('psh',1.0) else: datum.add_number('psh',0.0) if 'U' in flags: datum.add_number('urgent',1.0) else: datum.add_number('urgent',0.0) # datum.add_string('tos',tos) datum.add_number('packets',float(packets)) if bt.endswith(' M'): datum.add_number('bytes',float(bt[0:-2]) * MEGA) else: datum.add_number('bytes',float(bt)) # if pps.endswith(' M'): # datum.add_number('pps',float(pps[0:-2]) * MEGA) # else: # datum.add_number('pps',float(pps)) # if bps.endswith(' M'): # datum.add_number('bps',float(bps[0:-2]) * MEGA) # else: # datum.add_number('bps',float(bps)) # datum.add_number('bpp',float(bpp)) # datum.add_number('flows',float(flows)) if not TESTMODE: ret = anom.add(datum) # if ret.score != float('Inf'): print('{},{},{},{},{},{},{},{},{},{}'.format(ret.id,ret.score,dt + ' ' + tm,duration,proto,srcip,srcport,dstip,dstport,flags)) else: score = anom.calc_score(datum) # if score != float('Inf'): print('{},{},{},{},{},{},{},{},{},{}'.format(-1,score,dt + ' ' + tm,duration,proto,srcip,srcport,dstip,dstport,flags)) # save learned data if not TESTMODE: anom.save(SAVEID)
このスクリプトでは、softflowdを動かしたOpenWrtマシン(192.168.11.254)がdst IPとなっているフローのみを学習させます。
いくつかの定数を書き換える事で動作が変わります。
- NFFILE
nfdumpコマンドでテキストに変換したnetflowデータを格納したファイルのファイル名です。
- USE_DATETIME
Trueの場合は、日付時刻情報をJubatusに食わせます。今回はFalseとしています。アノマリ検知の場合、時刻も単なる数値と入れますが、この場合「時刻」という概念をもたないため、時間あたりのアクセス数等はJubatusは見ないためです。(うまく説明できていませんね...)例えば特定のsrc IPからのアクセス数の急激な増大等をチェックするにはアノマリ検知でなくJubatusのバースト検知を使うのかな?と思っています。アノマリ検知はあくまでも1つのDatum内に入っているデータの組み合わせの類似度を見るものだと理解しています。
またflagsは最初は纏めて文字列として扱っていましたが、それぞれ異なる意味を持つ独立したフラグなので、それぞれを「1.0」ないし「0.0」の数値表現に変換しています。
- NEW
Trueの場合は新規の学習します。Falseの場合は、以前に保存した学習データをロードしてから学習を再開します。学習が終わると学習データを保存します。
- TESTMODE
Trueの場合、Datumをadd()メソッドで学習データとして追加する変わりにcalc_score()メソッドで類似度スコアのみを計算、表示します。学習後のテスト用に使用します。
例えば通常src portはランダムに空いているところから選択されるので、通信の特徴を表さないと判断して使っていません。一方でdst portは多くの場合、アプリ固有のポート番号を示すので通信の特徴を示すと判断して使用しています。
- NFFILE
- Jubatusの起動と学習の実行
自分はdockerでjubatusを使っているので、下記の様に起動しました。$ docker run --expose 9199 -v $PWD:/tmp/config jubatus/jubatus jubaanomaly -f /tmp/config/config.json -d /tmp/config
そしてjubatusを起動したのとは別の端末を開き、
$ python nf_anomaly.py
の様に先ほどのpythonスクリプトを起動します。
実行すると下記の様に最初はやたらにinfがスコアとして出てきますが、徐々に1.0付近のスコアが出力される様になります。
下記は全部192.168.11.254上で起動しているproxy(ポート8118を使用)へのアクセスです。3795,inf,2019-04-13 23:43:20.727,0.000,TCP,192.168.11.13,36852,192.168.11.254,8118,...A.R.. 3796,inf,2019-04-13 23:43:21.803,0.136,TCP,192.168.11.13,36914,192.168.11.254,8118,...APRSF 3797,inf,2019-04-13 23:43:21.798,0.823,TCP,192.168.11.13,36882,192.168.11.254,8118,...APRS. 3798,inf,2019-04-13 23:43:21.803,0.198,TCP,192.168.11.13,36916,192.168.11.254,8118,...APRSF 3799,inf,2019-04-13 23:43:21.803,0.437,TCP,192.168.11.13,36918,192.168.11.254,8118,...APRSF 〜中略〜 5846,1.2876165700290432,2019-04-13 23:52:37.678,0.006,TCP,192.168.11.13,38270,192.168.11.254,8118,...A...F 5847,0.9999999993751156,2019-04-13 23:52:37.679,0.006,TCP,192.168.11.13,38272,192.168.11.254,8118,...A...F 5848,1.092236330642958,2019-04-13 23:52:37.697,0.023,UDP,1.1.1.1,53,192.168.11.254,52072,........ 5849,1.0562889007250853,2019-04-13 23:52:37.697,0.004,UDP,1.0.0.1,53,192.168.11.254,52072,........
- テストの実行
学習用とは別に、softflowdを動かしているOpenWrtマシンに対してnmapでスキャンをかけてnetflowデータを取り、これをjubatusに掛けてみます。
jubatusにかけるときは先ほどのスクリプト内のTESTMODEをFalse→Trueに変更します。下記に様にSYNのみのTCPセッションに大きなスコアが出ています。(192.168.11.252がnmapを実行したマシン)
ただしnmapによるスキャンと思われるSYNのみのTCPセッションすべてに大きなスコアが出ているわけも無いです。-1,929594.3746033125,2019-04-13 13:22:02.149,0.000,TCP,192.168.11.252,57232,192.168.11.254,8888,......S. -1,929594.3756006836,2019-04-13 13:22:02.149,0.001,TCP,192.168.11.252,39680,192.168.11.254,113,......S. -1,929594.3746033125,2019-04-13 13:22:02.150,0.000,TCP,192.168.11.252,35692,192.168.11.254,5900,......S. -1,929594.3746033125,2019-04-13 13:22:02.150,0.000,TCP,192.168.11.252,36882,192.168.11.254,135,......S. -1,1.320740745133347,2019-04-13 13:22:02.150,0.000,TCP,192.168.11.252,33674,192.168.11.254,143,......S.
雰囲気的にはflagのパターンで通常の通信で無いパターンを識別できている気もするのですが、netflowの情報からだとflagだけをヒューリスティックに解析するだけで十分な気もします。
上の方に書きましたが、実用的に異常通信の検知を行うには、アノマリだけでなくバースト検知なんかも組み合わせる必要がありそうです。
今回は1フローづづそのまま入力したのですが、同じソースIPからのフローを一定時間毎に束ねて統計取るとか、前処理も考えた方が良いのかもしれません。