2

単純な ssh サーバーを使用して ssh ブルートフォースの試みをキャプチャしようとしています。コードは次のとおりで、動作しますが、ユーザー名とパスワードの組み合わせを元の IP アドレスと一致させることができません。

これはこれまでのところ簡単です。ログは記録せず、stdout に出力するだけです。buildProtocol 定義を使用して、新しい接続が作成されたときにその IP アドレスを出力しました。ただし、異なるクライアントからの複数の ssh ブルートフォース試行を同時に追跡できるように、ユーザー名とパスワードの資格情報と共に IP アドレスを取得したいと考えています。現状では、接続が確立されたときにしか IP アドレスを取得できないため、複数の接続を同時に追跡することはできません。

明確にするために、接続または接続について言及するとき、ログインの成功について言及しているのではありません。これは私のコードでは不可能であり、意図したものではありません。再接続する前にパスワードを 3 回試行できる ssh サーバーへの単一接続について言及しています。

from zope.interface import implements
from twisted.conch.unix import UnixSSHRealm
from twisted.cred import portal
from twisted.cred.credentials import IUsernamePassword
from twisted.cred.checkers import ICredentialsChecker
from twisted.cred.error import UnauthorizedLogin
from twisted.conch.ssh import factory, userauth, keys, session
from twisted.internet import reactor, defer

with open('id_rsa') as privateBlobFile:
    privateKey = privateBlobFile.read()

with open('id_rsa.pub') as publicBlobFile:
    publicKey = publicBlobFile.read()

class FailDB:
    credentialInterfaces = IUsernamePassword, implements(ICredentialsChecker)

    def requestAvatarId(self, credentials):
        print"%s:%s" % ( credentials.username, credentials.password)
        return defer.fail(UnauthorizedLogin("invalid password"))

class UnixSSHdFactory(factory.SSHFactory):
    publicKeys = {
        'ssh-rsa': keys.Key.fromString(data=publicKey)
    }
    privateKeys = {
        'ssh-rsa': keys.Key.fromString(data=privateKey)
    }
    services = {
        'ssh-userauth': userauth.SSHUserAuthServer
    }

    def buildProtocol(self, addr):
        print addr
        return factory.SSHFactory.buildProtocol(self, addr)

if __name__ == '__main__':
    portal = portal.Portal(UnixSSHRealm())
    portal.registerChecker(FailDB())
    UnixSSHdFactory.portal = portal
    reactor.listenTCP(2022, UnixSSHdFactory())
    reactor.run()

出力例:

IPv4Address(TCP, '127.0.0.1', 42141)
root:password123
root:123456
root:letmein
4

1 に答える 1

0

メソッドから派生させSSHUserAuthServerてオーバーライドすることにより、失敗したログイン試行からアドレスを取得できます_ebBadAuth。例:

class AuthServer(userauth.SSHUserAuthServer):
    def _ebBadAuth(self, reason):
        addr = self.transport.getPeer().address.host
        print("addr {} failed to log in as {} using {}"
              .format(addr, self.user, self.method))
        userauth.SSHUserAuthServer._ebBadAuth(self, reason)

接続の成功も許可したい場合は、 をオーバーライドせずにSSHFactory.services、 のような派生クラスで更新するだけで済みますfactory.services.update({'ssh-userauth': AuthServer})

于 2015-02-10T20:12:05.530 に答える