3

私は現在、ラズベリーパイで実験しています。パケット検出ソフトウェアである Snort を実行しています。Snort が警告を発する場合、(Python) スクリプトを実行したいと思います。

Snort は、次のようにラズベリー pi で実行されます。

sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf

呼び出されたときにラズベリー パイの GPIO ピンを制御する Python スクリプトを作成しました。それをより文脈に取り入れるために。ラズベリー パイが ping/ICMP パケットを受信すると、赤色のアラーム ライトが点灯し、同じデバイスによって制御されます。

Snort ルールは現在機能しており、ICMP パケットが到着すると、アラートがコンソールに出力されます。ただし、snort で python スクリプトを実行する方法がわかりません

4

3 に答える 3

1

Below are 3 options, hopefully one will work:

  • "Strict" subprocess approach using subprocess's PIPEs
  • Approach using pexpect -- "Pexpect is a pure Python module for spawning child applications; controlling them; and responding to expected patterns in their output." -- not that this is the only non-standard package you would have to grab separately from the default python install.
  • Approach using pseudoterminals and good-old select to read file descriptors

Each of the approaches is bundled in a try_[SOME APPROACH] function. You should be able to update the 3 parameters at the top, then comment/uncomment one approach at the bottom to give it a shot.

It may be worthwhile testing both halves independently. In other words snort + my rpi.py (below). Then, if that works, my timed_printer.py (below) and your python script to toggle the RPi GPIO. If they both work independently, then you can be confident not much will need to be done to get the entire workflow operating properly.

Code

import subprocess

_cmd_lst = ['python', '-u', 'timed_printer.py']  # sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
_rpi_lst = ['python', '-u', 'rpi.py']            # python script that toggles RPi
_alert = 'TIME'                                  # The keyword you're looking for
                                                 #  in snort output

#===============================================================================

# Simple helper function that calls the RPi toggle script
def toggle_rpi():
    subprocess.call(_rpi_lst)



def try_subprocess(cmd_lst, alert, rpi_lst):
    p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=subprocess.PIPE, bufsize=1)

    try:
        while True:
            for line in iter(p.stdout.readline, b''):
                print("try_subprocess() read: %s" % line.strip())

                if alert in line:
                    print("try_subprocess() found alert: %s" % alert)
                    toggle_rpi()

    except KeyboardInterrupt:   print(" Caught Ctrl+C -- killing subprocess...")
    except Exception as ex:     print ex
    finally:
        print("Cleaning up...")
        p.kill()
        print("Goodbye.")



def try_pexpect(cmd_lst, alert, rpi_lst):
    import pexpect # http://pexpect.sourceforge.net/pexpect.html

    p = pexpect.spawn(' '.join(cmd_lst))

    try:
        while True:
            p.expect(alert)     # This blocks until <alert> is found in the output of cmd_str
            print("try_pexpect() found alert: %s" % alert)
            toggle_rpi()

    except KeyboardInterrupt:   print(" Caught Ctrl+C -- killing subprocess...")
    except Exception as ex:     print ex
    finally:
        print("Cleaning up...")
        p.close(force=True)
        print("Goodbye.")



def try_pty(cmd_lst, alert, rpi_lst, MAX_READ=2048):
    import pty, os, select

    mfd, sfd = pty.openpty()

    p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=sfd, bufsize=1)

    try:
        while True:
            rlist, _, _, = select.select([mfd], [], [])

            if rlist:
                data = os.read(mfd, MAX_READ)
                print("try_pty() read: %s" % data.strip())
                if not data:
                    print("try_pty() got EOF -- exiting")
                    break
                if alert in data:
                    print("try_pty() found alert: %s" % alert)
                    toggle_rpi()
            elif p.poll() is not None:
                print("try_pty() had subprocess end -- exiting")
                break

    except KeyboardInterrupt:   print(" Caught Ctrl+C -- killing subprocess...")
    except Exception as ex:     print ex
    finally:
        print("Cleaning up...")
        os.close(sfd)
        os.close(mfd)
        p.kill()
        print("Goodbye.")

#===============================================================================

try_subprocess(_cmd_lst, _alert, _rpi_lst)
#try_pexpect(_cmd_lst, _alert, _rpi_lst)
#try_pty(_cmd_lst, _alert, _rpi_lst)

Testing notes

To emulate your snort script (a script that "hangs", then prints something, then goes back to hanging, etc), I wrote this simple python script that I called timed_printer.py:

import time
while True:
    print("TIME: %s" % time.time())
    time.sleep(5)

And my rpi.py file was simply:

print("TOGGLING OUTPUT PIN")

There's no explicit output flushing here, in an attempt to best emulate normal output.

Final considerations

The first approach will read an entire line at a time. So if you expect your alert to be contained within a single line, you'll be fine.

The second approach (pexpect) will block until alert is encountered.

The third approach will read as soon as data is available, which I should point out is not necessarily a full line. If you see try_pty() read: with fragments of the snort output lines, causing you to miss alerts, you'll need to add some sort of buffering solution.

Docs

References: 1, 2

于 2015-03-08T16:55:14.163 に答える