ローリングコンバットピッチなう!

AIとか仮想化とかペーパークラフトとか

Jubatusでnetflowデータの異常検知

[technology]NetflowのデータをJubatusのアノマリ検知にかけてみる

弊ブログの下記の記事でOpenWRTにsoftflowdを仕掛け、nfcapdとnfdumpでnetflowデータを取得してみました。
rc30-popo.hatenablog.com

このデータを使って、Jubatusのアノマリ検知に掛けてみます。


  1. Jubatusの設定

    config.jsonを下記の通りでJubatusの公式チュートリアルそのままです。

    jubat.us

    {
        "method" : "lof",
        "parameter" : {
         "nearest_neighbor_num" : 10,
         "reverse_nearest_neighbor_num" : 30,
         "method" : "euclid_lsh",
         "parameter" : {
          "hash_num" : 8,
          "table_num" : 16,
          "probe_num" : 64,
          "bin_width" : 10,
          "seed" : 1234
         }
        },
       
        "converter" : {
         "string_filter_types": {},
         "string_filter_rules": [],
         "num_filter_types": {},
         "num_filter_rules": [],
         "string_types": {},
         "string_rules": [{"key":"*", "type":"str", "global_weight" : "bin", "sample_weight" : "bin"}],
         "num_types": {},
         "num_rules": [{"key" : "*", "type" : "num"}]
        }
       }
    

  2. netflowデータの取得

    前回の記事にも書きましたが、自宅でProxyサーバーとして使用しているOpenWrtマシンでsoftflowdを動かして数時間データを取りました。
    取ったデータは下記の様な感じです。

    Date first seen          Duration Proto      Src IP Addr:Port          Dst IP Addr:Port     Flags Tos  Packets    Bytes      pps      bps    Bpp Flows
    2019-04-13 23:43:20.727     0.000 TCP      192.168.11.13:36852 ->   192.168.11.254:8118  ...A.R..   0        1       52        0        0     52     1
    2019-04-13 23:43:21.803     0.136 TCP      192.168.11.13:36914 ->   192.168.11.254:8118  ...APRSF   0        7      594       51    34941     84     1
    2019-04-13 23:43:21.803     0.136 TCP     192.168.11.254:8118  ->    192.168.11.13:36914 ...AP.SF   0        5      541       36    31823    108     1
    2019-04-13 23:43:21.798     0.823 TCP      192.168.11.13:36882 ->   192.168.11.254:8118  ...APRS.   0       10     1387       12    13482    138     1

    nfcapd(netflow collector)で取得したデータをnfdumpを-o extendedオプション付でヒューマンリーダブルなテキスト情報に変換したものです。
    約2万9千フローくらい集めました。
    このデータなんですが、スペース区切りのCSVではなくて、固定長フィールドのデータになっています。また上記のサンプルでは判りませんが、転送パケット数やpps,bps等のデータは100万を超えると'16 M'の様にMとかGで表示されます。数値のMやGの間にはスペースが一個入っています。
    Jubatusに食わせる時はこの辺を考慮してデータを加工する必要があります。(下記のスクリプトではG表示は考慮していません)

  3. pythonスクリプトソース

    Jubatusに上記データを食わせて学習させるためのpythonスクリプトソースです。

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    # Anomaly detection test for netflow data
    
    import signal
    import sys
    from jubatus.anomaly import client
    from jubatus.common import Datum
    import datetime
    
    host = '172.17.0.2'
    port = 9199
    name = 'test'
    
    SAVEID = 'nfdata'
    NEW = True
    TESTMODE = False
    
    MEGA=1000000.0
    GUARD_IP = '192.168.11.254'
    
    USE_DATETIME = False
    
    NFFILE = 'nfdata_learn.txt'
    
    def do_exit(sig, stack):
        print('You pressed Ctrl+C.')
        print('Stop running the job.')
        sys.exit(0)
    
    if __name__ == '__main__':
        signal.signal(signal.SIGINT, do_exit)
    
    # set jubatus server
        anom = client.Anomaly(host, port, name)
    
        if NEW == True and TESTMODE == False:
    #   clear current data in Jubatus server
            anom.clear()
        else:
    #   load last learned data
            anom.load(SAVEID)
    
        with open(NFFILE, mode='r') as file:
            for line in file:
                if line.startswith('Date'):
                    pass
                elif line.startswith('Summary'):
                    break
                else:
    #                dt,tm,duration,proto,src,arrow,dst,flags,tos,packets,bt,pps,bps,bpp,flows = line[:-1].split()
    #                srcip,srcport = src.split(':')
    #                dstip,dstport = dst.split(':')
                    datum = Datum()
                    dt = line[0:10].strip()
                    tm = line[11:23].strip()
    #               Convert date & time to UNIX epoch
                    dtobj = datetime.datetime.strptime(dt + ' ' + tm + '000','%Y-%m-%d %H:%M:%S.%f')
                    dtepoch = dtobj.timestamp()
    
                    duration = line[24:33].strip()
                    proto = line[34:39].strip()
                    srcip = line[40:56].strip()
                    srcport = line[57:62].strip()
                    dstip = line[66:82].strip()
                    dstport = line[83:88].strip()
                    flags = line[89:97].strip()
                    tos = line[98:102].strip()
                    packets = line[103:111].strip()
                    bt = line[112:120].strip()
                    pps = line[121:129].strip()
                    bps = line[130:138].strip()
                    bpp = line[139:145].strip()
                    flows = line[146:151].strip()
    #                print(dt,tm,duration,proto,srcip,srcport,dstip,dstport,flags,tos,packets,bt,pps,bps,flows)a
                    if dstip == GUARD_IP:
                        if USE_DATETIME:
                            datum.add_number('datetime',dtepoch)
                        datum.add_number('duration',float(duration))
                        datum.add_string('proto',proto)
                        datum.add_string('srcip',srcip)
    #                    datum.add_string('srcport',srcport)
    #                    datum.add_string('dstip',dstip)
                        datum.add_string('dstport',dstport)
    #                    datum.add_string('flags',flags)
    #                   Decode flags into indivisual TCP flag
                        if 'A' in flags:
                            datum.add_number('ack',1.0)
                        else:
                            datum.add_number('ack',0.0)
                        if 'S' in flags:
                            datum.add_number('syn',1.0)
                        else:
                            datum.add_number('syn',0.0)
                        if 'F' in flags:
                            datum.add_number('fin',1.0)
                        else:
                            datum.add_number('fin',0.0)
                        if 'R' in flags:
                            datum.add_number('rst',1.0)
                        else:
                            datum.add_number('rst',0.0)
                        if 'P' in flags:
                            datum.add_number('psh',1.0)
                        else:
                            datum.add_number('psh',0.0)
                        if 'U' in flags:
                            datum.add_number('urgent',1.0)
                        else:
                            datum.add_number('urgent',0.0)
    #                    datum.add_string('tos',tos)
                        datum.add_number('packets',float(packets))
                        if bt.endswith(' M'):
                            datum.add_number('bytes',float(bt[0:-2]) * MEGA)
                        else:
                            datum.add_number('bytes',float(bt))
    #                    if pps.endswith(' M'):
    #                        datum.add_number('pps',float(pps[0:-2]) * MEGA)
    #                    else:
    #                        datum.add_number('pps',float(pps))
    #                    if bps.endswith(' M'):
    #                        datum.add_number('bps',float(bps[0:-2]) * MEGA)
    #                    else:
    #                        datum.add_number('bps',float(bps))
    #                    datum.add_number('bpp',float(bpp))
    #                    datum.add_number('flows',float(flows))
    
                        if not TESTMODE:
                            ret = anom.add(datum)
    #                        if ret.score != float('Inf'):
                            print('{},{},{},{},{},{},{},{},{},{}'.format(ret.id,ret.score,dt + ' ' + tm,duration,proto,srcip,srcport,dstip,dstport,flags))
                        else:
                            score = anom.calc_score(datum)
    #                        if score != float('Inf'):
                            print('{},{},{},{},{},{},{},{},{},{}'.format(-1,score,dt + ' ' + tm,duration,proto,srcip,srcport,dstip,dstport,flags))
    
    # save learned data
        if not TESTMODE:
            anom.save(SAVEID)
    

    このスクリプトでは、softflowdを動かしたOpenWrtマシン(192.168.11.254)がdst IPとなっているフローのみを学習させます。
    いくつかの定数を書き換える事で動作が変わります。

    • NFFILE

      nfdumpコマンドでテキストに変換したnetflowデータを格納したファイルのファイル名です。
    • USE_DATETIME

      Trueの場合は、日付時刻情報をJubatusに食わせます。今回はFalseとしています。アノマリ検知の場合、時刻も単なる数値と入れますが、この場合「時刻」という概念をもたないため、時間あたりのアクセス数等はJubatusは見ないためです。(うまく説明できていませんね...)例えば特定のsrc IPからのアクセス数の急激な増大等をチェックするにはアノマリ検知でなくJubatusのバースト検知を使うのかな?と思っています。アノマリ検知はあくまでも1つのDatum内に入っているデータの組み合わせの類似度を見るものだと理解しています。
      またflagsは最初は纏めて文字列として扱っていましたが、それぞれ異なる意味を持つ独立したフラグなので、それぞれを「1.0」ないし「0.0」の数値表現に変換しています。
    • NEW

      Trueの場合は新規の学習します。Falseの場合は、以前に保存した学習データをロードしてから学習を再開します。学習が終わると学習データを保存します。
    • TESTMODE

      Trueの場合、Datumをadd()メソッドで学習データとして追加する変わりにcalc_score()メソッドで類似度スコアのみを計算、表示します。学習後のテスト用に使用します。
    ちなみにこのスクリプト、やたらとコメントアウトが多いですが、最初はnetflowデータの全フィールドをjubatusに入力するつもりだったのですが、異常アクセスの検知にあまり役に立たない、むしろ混乱させそうなデータの入力を省略しています。
    例えば通常src portはランダムに空いているところから選択されるので、通信の特徴を表さないと判断して使っていません。一方でdst portは多くの場合、アプリ固有のポート番号を示すので通信の特徴を示すと判断して使用しています。
  4. Jubatusの起動と学習の実行

    自分はdockerでjubatusを使っているので、下記の様に起動しました。

    $ docker run --expose 9199 -v $PWD:/tmp/config jubatus/jubatus jubaanomaly -f /tmp/config/config.json -d /tmp/config

    そしてjubatusを起動したのとは別の端末を開き、

    $ python nf_anomaly.py

    の様に先ほどのpythonスクリプトを起動します。
    実行すると下記の様に最初はやたらにinfがスコアとして出てきますが、徐々に1.0付近のスコアが出力される様になります。
    下記は全部192.168.11.254上で起動しているproxy(ポート8118を使用)へのアクセスです。

    3795,inf,2019-04-13 23:43:20.727,0.000,TCP,192.168.11.13,36852,192.168.11.254,8118,...A.R..
    3796,inf,2019-04-13 23:43:21.803,0.136,TCP,192.168.11.13,36914,192.168.11.254,8118,...APRSF
    3797,inf,2019-04-13 23:43:21.798,0.823,TCP,192.168.11.13,36882,192.168.11.254,8118,...APRS.
    3798,inf,2019-04-13 23:43:21.803,0.198,TCP,192.168.11.13,36916,192.168.11.254,8118,...APRSF
    3799,inf,2019-04-13 23:43:21.803,0.437,TCP,192.168.11.13,36918,192.168.11.254,8118,...APRSF
    〜中略〜
    5846,1.2876165700290432,2019-04-13 23:52:37.678,0.006,TCP,192.168.11.13,38270,192.168.11.254,8118,...A...F
    5847,0.9999999993751156,2019-04-13 23:52:37.679,0.006,TCP,192.168.11.13,38272,192.168.11.254,8118,...A...F
    5848,1.092236330642958,2019-04-13 23:52:37.697,0.023,UDP,1.1.1.1,53,192.168.11.254,52072,........
    5849,1.0562889007250853,2019-04-13 23:52:37.697,0.004,UDP,1.0.0.1,53,192.168.11.254,52072,........

  5. テストの実行

    学習用とは別に、softflowdを動かしているOpenWrtマシンに対してnmapでスキャンをかけてnetflowデータを取り、これをjubatusに掛けてみます。
    jubatusにかけるときは先ほどのスクリプト内のTESTMODEをFalse→Trueに変更します。

    下記に様にSYNのみのTCPセッションに大きなスコアが出ています。(192.168.11.252がnmapを実行したマシン)
    ただしnmapによるスキャンと思われるSYNのみのTCPセッションすべてに大きなスコアが出ているわけも無いです。

    -1,929594.3746033125,2019-04-13 13:22:02.149,0.000,TCP,192.168.11.252,57232,192.168.11.254,8888,......S.
    -1,929594.3756006836,2019-04-13 13:22:02.149,0.001,TCP,192.168.11.252,39680,192.168.11.254,113,......S.
    -1,929594.3746033125,2019-04-13 13:22:02.150,0.000,TCP,192.168.11.252,35692,192.168.11.254,5900,......S.
    -1,929594.3746033125,2019-04-13 13:22:02.150,0.000,TCP,192.168.11.252,36882,192.168.11.254,135,......S.
    -1,1.320740745133347,2019-04-13 13:22:02.150,0.000,TCP,192.168.11.252,33674,192.168.11.254,143,......S.
    

    雰囲気的にはflagのパターンで通常の通信で無いパターンを識別できている気もするのですが、netflowの情報からだとflagだけをヒューリスティックに解析するだけで十分な気もします。
    上の方に書きましたが、実用的に異常通信の検知を行うには、アノマリだけでなくバースト検知なんかも組み合わせる必要がありそうです。

というわけで正直、満足行く結果まで達していないのですが、一応netflowで取得したフロー情報をJubatusで解析してみました。
今回は1フローづづそのまま入力したのですが、同じソースIPからのフローを一定時間毎に束ねて統計取るとか、前処理も考えた方が良いのかもしれません。