5

現在、このライブラリを使用して、セットアップしたカフカサーバーのストレステストを行っています: https://github.com/dsully/pykafka

import kafka
import time

def test_kafka_server(n=1):
    for i in range(0,n):
        producer = kafka.producer.Producer('test',host='10.137.8.192')
        message = kafka.message.Message(str(time.time()))
        producer.send(message)
        producer.disconnect()

def main():
    test_kafka_server(100000)

if __name__ == '__main__':
    main()

結局何が起こっているのかというと、自分のローカル マシンが過負荷になってしまうということです。

エラー 10055が表示されます。これは、Google によると、「一度に開いている接続が多すぎるため、Windows が TCP/IP ソケット バッファを使い果たしました」という意味です。 netstat によると、producer.disconnect() はソケットを閉じているのではなく、TIME_WAIT状態にしています。

ipython デバッガーは次の行を指します。

C:\Python27\lib\socket.pyc in meth(name, self, *args)
    222     proto = property(lambda self: self._sock.proto, doc="the socket protocol")
    223 
--> 224 def meth(name,self,*args):
    225     return getattr(self._sock,name)(*args)
    226 

犯人として、しかし、これは私が快適であるよりも低いレベルで物事を台無しにするようです.

私は検索して、このPythonソケットが接続を適切に閉じていないことを発見しました。

setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)

そのため、io.py ファイルでそのオプションを使用して pykafka lib を再構築しました。

  def connect(self):
    """ Connect to the Kafka server. """
    global socket
    self.socket = socket.socket()
    self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    self.socket.connect((self.host, self.port))

それでも同じエラーが発生します。

setsockopt 行を正しい場所に配置していませんか? 他に試してみることはありますか?

4

1 に答える 1

6

あなたが説明しているのは、ソケットレベルでの通常の TCP 動作です。ユーザーレベルのプログラムがソケットを閉じても、カーネルはすぐにはソケットを解放しません。TIME_WAIT 状態に入ります。

TIME-WAIT (サーバーまたはクライアントのいずれか) は、リモート TCP が接続終了要求の確認応答を受信したことを確認するのに十分な時間が経過するのを待機することを表します。[RFC 793 によると、MSL (セグメントの最大有効期間) として知られる最大 4 分間、接続は TIME-WAIT 状態に留まることができます。

したがって、ソケットは閉じられます。socket.SO_REUSEADDR はリスナー (サーバー) 用であり、クライアント接続には影響しません。まあ、ソケットをバインドするときに実際に使用されます。

于 2012-11-08T12:22:51.223 に答える