3

sr() または srp() 関数を使用する場合 - Scapy は、受信したパケットが送信したパケットに対する応答であることをどのように認識しますか?

BACNet を模倣するカスタム プロトコルを作成しました。WHO_IS パケットを BACNet デバイスに送信すると、デバイスは I_AM パケットで応答します。これはレイヤー バインディングのために正しく逆アセンブルされていますが、sr 関数はそれを応答として認識しません。Scapy がこれらのパケットを回答として受け入れるようにするにはどうすればよいですか?

更新: それは私のレイヤー クラスがどのように見えるかです。メソッドは問題ないように見えますanswers()が、まだ機能しません。answers()メソッドを実装するときに誤解した可能性があるものはありますか? 私の理解selfでは、レイヤー自体のクラスを指しother、問題の受信パケットです。したがって、ペイロードを次の上位層に渡すために、次の上位層クラスでメソッドを渡しother.payload、呼び出します。answers()のようにレイヤーが重ねられてEther/IP/UDP/BVLC/NPDU/APDUいます。

class BVLC(Packet):

    name = "BVLC"
    fields_desc = [
       # many fields
                   ]

    def post_build(self, pkt, pay):
        if self.length == None:                
            pkt = pkt[0:2] + struct.pack('!H', len(pay)+len(pkt)) + pkt[4:]  
        return pkt+pay

    def answers(self, other):
        if not isinstance(other,BVLC):
            return 0
        else:
            return NPDU.answers(other.payload)


class NPDU(Packet):

    name = "NPDU"
    fields_desc = [ 
        # again, many fields
                   ]

    def answers(self, other):
        if not isinstance(other,NPDU):
            return 0
        else:
            return APDU.answers(other.payload)

class APDU(Packet):

    name = "APDU"
    fields_desc = [
          # many fields and the only relevant field in this case
            ConditionalField(
                ByteEnumField(
                    "serviceChoice", None, APDU_Service_Unconfirmed.revDict()),
                    lambda pkt: pkt.pduType == APDU_Type.CONFIRMED_SERVICE_REQUEST or
                                pkt.pduType == APDU_Type.UNCONFIRMED_SERVICE_REQUEST), 
                   ]

    def answers(self, other):
        if not isinstance(other,APDU):
            return 0
        else:
            if self.serviceChoice == APDU_Service_Unconfirmed.WHO_IS and\
               other.serviceChoice == APDU_Service_Unconfirmed.I_AM:
                return 1
        return 0
4

1 に答える 1

4

Scapyは、送信された各パケットをsr呼び出すだけです。したがって、レイヤーrecv_packet.answers(sent_packet)に実装する必要があります。次を参照してください。def answers()

def answers(self, other):
    """DEV: true if self is an answer from other"""
    if other.__class__ == self.__class__:
        return self.payload.answers(other.payload)
    return 0

TCP元のレイヤーの抜粋を次に示します。

def answers(self, other):
    if not isinstance(other, TCP):
        return 0
    if conf.checkIPsrc:
        if not ((self.sport == other.dport) and
                (self.dport == other.sport)):
            return 0
    if (abs(other.seq-self.ack) > 2+len(other.payload)):
        return 0
    return 1
于 2015-10-16T17:56:04.770 に答える