1

「GetExtendedTcpTable」の使用に問題があります。スクリプトを実行しようとすると、次のようなメッセージが表示されます。

AssertionError: [エラー 0] 操作は正常に完了しました

スクリプトが正常に動作することはめったにありません。「操作が完了しました。何が問題なのですか?」というメッセージがわかりません。

これはコードです、私は実行しようとしました:

from ctypes import *
from ctypes.wintypes import *
from socket import  inet_aton,   inet_ntoa,  htons


AF_INET = 2
TCP_TABLE_BASIC_LISTENER = 0
TCP_TABLE_BASIC_CONNECTIONS = 1
TCP_TABLE_BASIC_ALL = 2
TCP_TABLE_OWNER_PID_LISTENER = 3
TCP_TABLE_OWNER_PID_CONNECTIONS = 4
TCP_TABLE_OWNER_PID_ALL = 5
TCP_TABLE_OWNER_MODULE_LISTENER = 6
TCP_TABLE_OWNER_MODULE_CONNECTIONS = 7
TCP_TABLE_OWNER_MODULE_ALL = 8

# for storing socket info python style.  
class socket_info:

    State = None
    LocalAddr = None
    LocalPort = None
    RemoteAddr = None
    RemotePort = None

    def __init__ (self, **kwargs):

        for key, word in kwargs.items():
            setattr(self, key, word)

def formatip (ip):
    ip = inet_aton (str(ip))
    return inet_ntoa (ip[::-1])

states = {
    1 : "TCP_STATE_CLOSED",
    2 : "TCP_STATE_LISTEN",
    3 : "TCP_STATE_SYN_SENT",
    4 : "TCP_STATE_SYN_RCVD",
    5 : "TCP_STATE_ESTAB",
    6 : "TCP_STATE_FIN_WAIT",
    7 : "TCP_STATE_FIN_WAIT2",
    8 : "TCP_STATE_CLOSE_WAIT",
    9 : "TCP_STATE_CLOSING",
    10 : "TCP_STATE_LAST_ACK",
    11 : "TCP_STATE_TIME_WAIT",
    12 : "TCP_STATE_DELETE_TCB",

    "TCP_STATE_CLOSED" : 1,
    "TCP_STATE_LISTEN" : 2,
    "TCP_STATE_SYN_SENT" : 3,
    "TCP_STATE_SYN_RCVD" : 4,
    "TCP_STATE_ESTAB" : 5,
    "TCP_STATE_FIN_WAIT" : 6,
    "TCP_STATE_FIN_WAIT2" : 7,
    "TCP_STATE_CLOSE_WAIT" : 8,
    "TCP_STATE_CLOSING" : 9,
    "TCP_STATE_LAST_ACK" :10,
    "TCP_STATE_TIME_WAIT" : 11,
    "TCP_STATE_DELETE_TCB" : 12 }

class MIB_TCPROW_OWNER_PID(Structure):
    _fields_ = [
        ("dwState", DWORD),
        ("dwLocalAddr", DWORD),
        ("dwLocalPort", DWORD),
        ("dwRemoteAddr", DWORD),
        ("dwRemotePort", DWORD),
        ("dwOwningPid", DWORD)
        ]


class MIB_TCPTABLE_OWNER_PID(Structure):
    _fields_ = [
        ("dwNumEntries", DWORD),
        ("MIB_TCPROW_OWNER_PID", MIB_TCPROW_OWNER_PID * 100)
        ]



def GetExtendedTcpTable (vip=AF_INET):
    table = MIB_TCPTABLE_OWNER_PID ()
    so = sizeof (table)
    size = DWORD (so)
    order = c_int(1)

    failure= windll.iphlpapi.GetExtendedTcpTable (
        byref (table),
        addressof (size),
        order,
        vip,
        TCP_TABLE_OWNER_PID_ALL,
        0    )

    assert not failure,  WinError (GetLastError ())

    pytables = []
    tables = table.MIB_TCPROW_OWNER_PID

    for index in range(table.dwNumEntries):
        table = tables [index]
        pytables.append (
            socket_info (
                State=states.get (table.dwState, "UNKNOWN_STATE_%s" %(str(table.dwState))),
                LocalAddr=formatip (table.dwLocalAddr),
                LocalPort=htons(table.dwLocalPort),
                RemoteAddr=formatip (table.dwRemoteAddr),
                RemotePort=htons(table.dwRemotePort),
                OwningPid = int (table.dwOwningPid)
            )
        )
    return pytables


def GetTcpTableForPid (pid):
    tables = GetExtendedTcpTable ()
    for table in tables:
        if table.OwningPid == pid: return table
    raise "Cannot find tcp table for pid %s" %pid

dict_process = {}
pid_set =set()
pid_list = []
tcp_info_list = []
tcp_info = GetExtendedTcpTable()
for item in tcp_info:
    LocalAddr = item.LocalAddr
    LocalPort = item.LocalPort
    RemoteAddr = item.RemoteAddr
    RemotePort = item.RemotePort
    OwningPid = item.OwningPid
    print('local Addr: '+ LocalAddr,'local port: '+ str(LocalPort),'remote Addr: ' + RemoteAddr, 'Remote Port: ' + str(RemotePort), OwningPid)

スクリプトは時々実行されます。5 分間実行しても、このばかげたミスで約 1 時間は機能しません。それを回避する方法は?

私は本当に知りません、それとは何ですか。助けてください、私は何が間違っていますか?

Win7 SP1 x64でpython 3.2を使用しています

本当にありがとうございました!

4

1 に答える 1

2

を使用しないでくださいaddressof(size)。これは、32 ビット C としてキャストされる Python 整数を返しますintbyref(size)64 ビット Python を使用している場合は 64 ビット値になるポインターを作成するために使用します。

GetExtendedTcpTableを呼び出しませんSetLastErrorDWORD次のいずれかのコードを含む を返します。

NO_ERROR = 0
ERROR_INVALID_PARAMETER = 87
ERROR_INSUFFICIENT_BUFFER = 122

pdwSizeバッファーが小さすぎる場合は、引数に必要なサイズがあります。ここでのオプションの 1 つは、長さ 0 の配列から始めることです。次にresize構造体。最後castに配列を正しいサイズにします。

class MIB_TCPTABLE_OWNER_PID(Structure):
    _fields_ = [
        ("dwNumEntries", DWORD),
        ("MIB_TCPROW_OWNER_PID", MIB_TCPROW_OWNER_PID * 0),
    ]

_GetExtendedTcpTable = windll.iphlpapi.GetExtendedTcpTable

def GetExtendedTcpTable(vip=AF_INET):
    table = MIB_TCPTABLE_OWNER_PID()
    size = DWORD() 
    order = 1

    failure = _GetExtendedTcpTable(
                  byref(table),
                  byref(size),
                  order,
                  vip,
                  TCP_TABLE_OWNER_PID_ALL,
                  0)

    if failure == ERROR_INSUFFICIENT_BUFFER:
        resize(table, size.value)
        memset(byref(table), 0, sizeof(table))
        failure = _GetExtendedTcpTable(
                      byref(table),
                      byref(size),
                      order,
                      vip,
                      TCP_TABLE_OWNER_PID_ALL,
                      0)

    if failure: 
        raise WinError(failure)

    ptr_type = POINTER(MIB_TCPROW_OWNER_PID * table.dwNumEntries)
    tables = cast(table.MIB_TCPROW_OWNER_PID, ptr_type)[0]

    pytables = []
    for table in tables:
        # rest unchanged

Win32 の値に関しては、一般に Python ではLastError依存すべきではありません。以前の呼び出しからの古いエラー コードが表示されているのか、それとも間にある呼び出しによって値GetLastErrorが変更されたのかはわかりません。LastErrorを使用する 1 つの API 呼び出しをチェックしている場合は、呼び出しが失敗したかどうかをその直後LastErrorにチェックしても問題ありません。GetLastErrorしかし、より一般的には、次のように DLL をロードする必要がある場合がありますuse_last_error=True

iphlpapi = WinDLL('iphlpapi', use_last_error=True)

WinDLLこのインスタンスから作成された関数ポインターはLastError、呼び出しが返された直後にスレッド ローカル ストレージに保存されます。呼び出すget_last_errorと、保存されたエラー コードが返されます。事前に、関数が呼び出される前にset_last_error(0)0 をスワップインするために呼び出すことができます。LastError

于 2013-08-05T21:50:55.717 に答える