現在、このライブラリを使用して、セットアップしたカフカサーバーのストレステストを行っています: https://github.com/dsully/pykafka
import kafka
import time
def test_kafka_server(n=1):
for i in range(0,n):
producer = kafka.producer.Producer('test',host='10.137.8.192')
message = kafka.message.Message(str(time.time()))
producer.send(message)
producer.disconnect()
def main():
test_kafka_server(100000)
if __name__ == '__main__':
main()
結局何が起こっているのかというと、自分のローカル マシンが過負荷になってしまうということです。
エラー 10055が表示されます。これは、Google によると、「一度に開いている接続が多すぎるため、Windows が TCP/IP ソケット バッファを使い果たしました」という意味です。 netstat によると、producer.disconnect() はソケットを閉じているのではなく、TIME_WAIT
状態にしています。
ipython デバッガーは次の行を指します。
C:\Python27\lib\socket.pyc in meth(name, self, *args)
222 proto = property(lambda self: self._sock.proto, doc="the socket protocol")
223
--> 224 def meth(name,self,*args):
225 return getattr(self._sock,name)(*args)
226
犯人として、しかし、これは私が快適であるよりも低いレベルで物事を台無しにするようです.
私は検索して、このPythonソケットが接続を適切に閉じていないことを発見しました。
setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
そのため、io.py ファイルでそのオプションを使用して pykafka lib を再構築しました。
def connect(self):
""" Connect to the Kafka server. """
global socket
self.socket = socket.socket()
self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.socket.connect((self.host, self.port))
それでも同じエラーが発生します。
setsockopt 行を正しい場所に配置していませんか? 他に試してみることはありますか?