1

新しいファイルのディレクトリを (libinotify を使用して) 監視し、新しいファイルごとに何らかの処理を行ってからストレージ サーバーにコピーする Python スクリプトを開発しています。NFS マウントを使用していましたが、いくつかのパフォーマンスの問題があり、現在 FTP でテストしています。FTP は nfs よりもはるかに少ないリソースを使用しているようです (負荷は常に 2 未満であり、nfs では 5 を超えていました)。

私たちが現在抱えている問題は、TIME_WAIT 状態で開いたままにしておく接続の量です。ストレージには、待機時間で約 15k 接続のピークがあります。

新しい転送に以前の接続を再利用する方法があるかどうか疑問に思っていました.

それを行う方法があるかどうか誰でも知っていますか?

ありがとう

4

2 に答える 2

1

これは、前のコメントへのコメントに基づいた新しい回答です。

単一の TCP ソケットを使用し、すべて 1 つの大きなストリームで、各ファイルの名前と内容をnetstringsとして交互に送信することで、各ファイルを送信します。

私は Python 2.6 を想定しており、両側のファイルシステムが同じエンコーディングを使用しており、多数の同時クライアントを必要としない (ただし、実際のクライアントとテスターなど、場合によっては 2 つ必要になる場合もあります)。 . また、メソッドが に登録し、通知をキューに入れ、それらを 1 つずつ s するモジュールがfilegeneratorあると仮定しています。generate()inotifyyield

client.py:

import contextlib
import socket
import filegenerator

sock = socket.socket()
with contextlib.closing(sock):
    sock.connect((HOST, 12345))
    for filename in filegenerator.generate():
        with open(filename, 'rb') as f:
            contents = f.read()
            buf = '{0}:{1},{2}:{3},'.format(len(filename), filename, 
                                            len(contents), contents)
            sock.sendall(buf)

サーバー.py:

import contextlib
import socket
import threading

def pairs(iterable):
    return zip(*[iter(iterable)]*2)

def netstrings(conn):
    buf = ''
    while True:
        newbuf = conn.recv(1536*1024) 
        if not newbuf:
            return
        buf += newbuf
        while True:
            colon = buf.find(':')
            if colon == -1:
                break
            length = int(buf[:colon])
            if len(buf) >= colon + length + 2:
                if buf[colon+length+1] != ',':
                    raise ValueError('Not a netstring') 
                yield buf[colon+1:colon+length+1]
                buf = buf[colon+length+2:]

def client(conn):
    with contextlib.closing(conn):
        for filename, contents in pairs(netstrings(conn)):
            with open(filename, 'wb') as f:
                f.write(contents)

sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
with contextlib.closing(sock):
    sock.bind(('0.0.0.0', 12345))
    sock.listen(1)
    while True:
        conn, addr = sock.accept()
        t = threading.Thread(target=client, args=[conn])
        t.daemon = True
        t.start()

Windows で約 200 を超えるクライアント、Linux および BSD (Mac を含む) で 100 を超えるクライアント、あまり良くないプラットフォームで 10 を超えるクライアントが必要な場合は、Linux で BSD を使用して、スレッド化された設計ではなくイベント ループ設計を使用することをお勧めしますepollkqueue、および Windows の IO 完了ポート。これは大変なことですが、幸いなことに、すべてをまとめてくれるフレームワークがあります。2 つの一般的な (そして非常に異なる) 選択肢はTwistedgeventです。

特に優れている点の1 つgeventは、現在スレッド化されたコードを記述できることです。いくつかの簡単な変更を加えるだけで、魔法のようにイベント ベースのコードに変換できます。

一方、最終的にイベントベースのコードが必要になる場合は、最初からフレームワークを学習して使用acceptする方がよいでしょう。recv完全なメッセージが表示され、正常にシャットダウンするなど、気になる部分を書き出すだけです。結局のところ、上記のコードの半分以上は基本的に、すべてのサーバーが共有するもののボイラープレートです。


コメントで、あなたは次のように述べました。

また、ファイルはバイナリであるため、クライアントのエンコーディングがサーバーのエンコーディングと異なる場合、問題が発生する可能性があります。

各ファイルをバイナリ モード ('rb'および'wb') で開き、バイナリ文字列を文字として解釈したり、埋め込まれた NUL 文字を EOF などとして処理したりせずに処理できるプロトコル (ネット文字列) を意図的に選択したことに注意してください。そして、私が使用している間、Python 2.x では、文字列をフィードするか、ロケールベースのフォーマット タイプを指定しstr.formatない限り、暗黙のエンコーディングは行われません。どちらも行っていません。(3.x では、代わりにunicodeを使用する必要があることに注意してください。これにより、コードが少し変更されます。)bytesstr

つまり、クライアントとサーバーのエンコーディングは含まれません。FTP の I モードとまったく同じバイナリ転送を行っています。


しかし、反対に、テキストを転送してターゲット システム用に自動的に再エンコードしたい場合はどうでしょうか。これを行うには、次の 3 つの簡単な方法があります。

  1. クライアントのエンコーディングを (先頭に 1 回、またはファイルごとに 1 回) 送信し、サーバーでクライアントからデコードして、ローカル ファイルに再エンコードします。
  2. ソケットを含め、すべてをテキスト/ユニコード モードで実行します。これはばかげており、2.x でも同様に実行するのは困難です。
  3. ワイヤ エンコーディングを定義します (UTF-8 など)。クライアントは、ファイルのデコードと送信用の UTF-8 へのエンコードを担当します。サーバーは、受信時に UTF-8 をデコードし、ファイルをエンコードします。

3 番目のオプションを使用すると、ファイルがデフォルトのファイルシステム エンコーディングになると仮定すると、変更されたクライアント コードは次のようになります。

with io.open(filename, 'r', encoding=sys.getfilesystemencoding()) as f:
    contents = f.read().encode('utf-8')

そしてサーバー上で:

with io.open(filename, 'w', encoding=sys.getfilesystemencoding()) as f:
    f.write(contents.decode('utf-8'))

またio.open、関数はデフォルトでユニバーサル改行を使用するため、クライアントは何でも Unix スタイルの改行に変換し、サーバーは独自のネイティブの改行タイプに変換します。

FTP の T モードは、実際には再エンコードを行わないことに注意してください。改行変換 (およびそのより限定されたバージョン) のみを行います。

于 2013-07-05T06:45:29.657 に答える
0

はい、 との接続を再利用できますftplib。あなたがしなければならないのは、それらを閉じずに使い続けることだけです。

たとえば、メソッドが に登録され、通知をキューに入れ、それらを 1 つずつ s するモジュールfilegeneratorがあるとします。generate()inotifyyield

import ftplib
import os
import filegenerator

ftp = ftplib.FTP('ftp.example.com')
ftp.login()
ftp.cwd('/path/to/store/stuff')

os.chdir('/path/to/read/from/')

for filename in filegenerator.generate():
    with open(filename, 'rb') as f:
        ftp.storbinary('STOR {}'.format(filename), f)

ftp.close()

私はこれに少し混乱しています:

私たちが現在抱えている問題は、TIME_WAIT 状態で開いたままにしておく接続の量です。

問題は、ファイルごとに新しい接続を作成することではなく、古い接続を閉じないことです。その場合、解決策は簡単です。それらを閉じるだけです。


それか、またはそれらすべてを並行して実行しようとしていますが、それがあなたがしていることに気づいていません。

いくらかの並列処理が必要であるが無制限にそうではない場合は、簡単に作成できます。たとえば、それぞれが開いているftplib接続を持ち、それぞれがキューから読み取り、inotifyそのキューにプッシュされたばかりのスレッドを持つ 4 つのスレッドのプールを作成できます。

于 2013-05-06T19:02:45.087 に答える