1

回線ごとに処理する必要のあるストリーミングデータを送信するサーバーに接続しています。したがって、個々の行を解析してから、各行を処理する必要があります。次のコードは問題なく機能しているように見えますが、この種のことを行うための標準的なデザインパターンがあるかどうか疑問に思っています。それともこれが進むべき道ですか?

キューは深刻なオーバーヘッドをもたらしますか?できるだけ速く効率的にする必要があります。そのため、ツイストのようにライブラリから離れました。

import socket, multiprocessing

def receive_proc(s, q):
    data = ''
    while True:
        data += s.recv(4096)
        if '\n' in data:
            lines = data.split('\n')[:-1]
            for line in lines:
                if len(line) > 0:
                    q.put(line)
                    data = data.replace(line+'\n', '', 1)

q = multiprocessing.Queue()
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('127.0.0.1', 1234))

p = multiprocessing.Process(target=receive_proc, args=(s,q))
p.start()

while True:
    line = q.get()

    # do your processing here
4

1 に答える 1

4

ねじれのようなものから離れたいという正当な理由は確かにありますが、効率はそれらの中にあるとは思いません-それらは正しい方法で最適化される可能性が高いと思います。パフォーマンスはトリッキーな獣であり、多くの場合、ボトルネックは実際には思った場所にありません。そのため、適切に最適化する前にプロファイルを作成する必要があります。たとえば、フレームワークは、パフォーマンスを確実に向上させるC拡張機能にコードをプッシュする努力をした可能性があります。パフォーマンスが重要な場合動機付け、サードパーティのものはおそらくより安全なオプションです。また、他の人がさまざまなユースケースや環境でテストおよび調整したコードを使用することには大きな議論があります-ホイールの多くを再発明することになった場合、いくつかのスポークが欠落する可能性があるというリスクが常にあります。

ただし、実行する必要があることは非常に単純なように思われるため、フレームワークのインストールと学習、およびコードへの別のランタイム依存関係の追加のオーバーヘッドは正当化されない場合があります。また、主にIOバウンドである場合、処理を行うために少し余分なCPUを消費しても、とにかく大きな違いはありません。自分で書くほうが(時間的には)速く、パフォーマンスも「十分」だとわかっていたからといって、過去にねじれたようなことは避けてきました。ツイストのコールバックシステムでは、デバッグが少し難しいことにいつも気づいています。たとえば、エラーメッセージにアクセスするのは少し面倒です。それは決して不可能ではなく、多くの人がそれを非常にうまく使用していますが、個人的には、単純なタスクには正当化できないほど「面倒」であることがわかりました。

この場合、受信と処理を独自のプロセスに分割するというあなたの考えは誤った経済である可能性があると思います-ソケットからのデータの受信は非常に高速であり、純粋なPythonで大量の処理を行う場合は、それが支配的である可能性がありますパフォーマンスファクター。しかし、あなたがどのような処理をしているのかを知らなければ、はっきりとは言えません。時間とCPUを集中的に使用し、各行を前の行とは独立して処理できる場合は、おそらく合理的ですが、処理をワーカープロセスのセット全体にファームアウトすることをお勧めします。これは、既存のコードに基づいて非常に簡単です。メインプロセスを「スレーブ」ではなくレシーバーにして、すべてが共有するワーカーのプールを作成するだけです。Queue。各ワーカーは、次のアイテムを選択して結果を生成するループを通過します。それぞれにかかる時間は関係ありません。次のアイテムが利用可能になると、次のアイテムを取得します(そしてそれQueueを処理します)。

ただし、処理ループも主にIOバウンド(ファイルへの書き込みなど)である場合は、すべてをパイプにプッシュするオーバーヘッドよりも、実際には単一のプロセスの方が優れていることがわかります。これは、CPUアーキテクチャを含む多くの要因に依存します(一部のシステムでは、CPUコア間の転送が他のシステムよりも高価になります)が、パフォーマンスが向上するという確信がない限り、最終的には複数のプロセスを使用したくありません。

とにかく、ループIOバウンドである場合は、非ブロッキングIOを備えた単一のプロセスが進むべき道であることがわかるかもしれません。Pythonのselectモジュールを使用してこれを自分で行うことも、eventletgeventなどのライブラリを使用するとよりクリーンになる場合があります。

無関係ですが、バッファから開始点を取り除く方法は非常に非効率的です。使用する必要はありません。次のようにreplace()、既存のものを使用できます。split()

while True:
    data += s.recv(4096)
    if '\n' in data:
        lines = data.split('\n')
        for line in lines[:-1]:
            if len(line) > 0:
                q.put(line)
        data = lines[-1]
于 2013-01-24T14:29:30.983 に答える