12

splice()を使用して複数のソケットに書き込むために、tee()で「マスター」パイプを複製しています。当然、これらのパイプは、宛先ソケットにsplice()できる量に応じて、さまざまなレートで空になります。したがって、次に「マスター」パイプにデータを追加してからもう一度tee()すると、パイプに64KBを書き込むことができるが、「スレーブ」パイプの1つに4KBしか書き込むことができない状況が発生する可能性があります。その場合、すべての「マスター」パイプをソケットにsplice()すると、残りの60KBをそのスレーブパイプにtee()できなくなると推測しています。本当?「unteed」データの先頭に設定したtee_offset(0から開始)を追跡し、それを超えてsplice()しないようにすることができると思います。したがって、この場合、tee_offsetを4096に設定し、それ以上スプライスしないようにします。他のパイプにすべてにティーすることができます。私はここで正しい方向に進んでいますか?私へのヒント/警告はありますか?

4

1 に答える 1

25

私の理解が正しければ、複数のソケットに多重化したいリアルタイムのデータ ソースがあります。データを生成するものに接続された単一の「ソース」パイプがあり、データを送信するソケットごとに「宛先」パイプがあります。あなたがしていることはtee()、ソースパイプから各宛先パイプにデータsplice()をコピーし、それを宛先パイプからソケット自体にコピーするために使用しています。

ここで直面する根本的な問題は、ソケットの 1 つが単に追いつかない場合です。データを送信するよりも速く生成している場合、問題が発生します。これはパイプの使用とは関係ありません。基本的な問題です。したがって、この場合に対処するための戦略を選択することをお勧めします - これらのことは後であなたを噛むことがよくあるため、一般的であるとは思わない場合でも、これを処理することをお勧めします. 基本的な選択肢は、問題のあるソケットを閉じるか、出力バッファーがクリアされるまでデータをスキップすることです。後者の選択は、たとえば、オーディオ/ビデオ ストリーミングに適している可能性があります。

ただし、パイプの使用に関連する問題、Linux ではパイプのバッファーのサイズに柔軟性がないことです。Linux 2.6.11 以降 (このtee()呼び出しは 2.6.17 で追加されました)、デフォルトは 64K です -パイプのマンページを参照してください。2.6.35 以降、この値はF_SETPIPE_SZオプションを使用してfcntl()( fcntl のマンページを参照)、 で指定された制限まで変更できます/proc/sys/fs/pipe-size-maxが、バッファリングは、ユーザー空間で動的に割り当てられたスキームよりもオンデマンドで変更するのが厄介です。これは、遅いソケットに対処する能力がいくらか制限されることを意味します。これが許容できるかどうかは、データを受信して​​送信できると予想される速度によって異なります。

このバッファリング戦略が受け入れられると仮定すると、各宛先パイプがソースから消費したデータの量を追跡する必要があるという仮定は正しく、すべての宛先パイプが消費したデータを破棄することだけが安全です。tee()にはオフセットの概念がないため、これはやや複雑です。パイプの先頭からしかコピーできません。この結果、最も遅いソケットの速度でしかコピーできないということです。これは、データの一部がソースから消費されるまで、宛先パイプへのコピーを使用できないためですtee()。すべてのソケットには、消費しようとしているデータがあります。

これをどのように処理するかは、データの重要性によって異なります。tee()との速度が本当に必要splice()で、遅いソケットが非常にまれなイベントになると確信している場合は、次のようにすることができます (ノンブロッキング IO と単一のスレッドを使用していると仮定しましたが、同様のものが複数のスレッドでも機能します):

  1. すべてのパイプがノンブロッキングであることを確認してください (fcntl(d, F_SETFL, O_NONBLOCK)各ファイル記述子をノンブロッキングにするために使用します)。
  2. read_counter各宛先パイプの変数をゼロに初期化します。
  3. ソースパイプに何かがあるまで待機するには、epoll()のようなものを使用します。
  4. がゼロのすべての宛先パイプをループし、それぞれにデータを転送するためread_counterに呼び出します。必ずフラグtee()を渡してください。SPLICE_F_NONBLOCK
  5. read_counterによって転送された量だけ、宛先パイプごとにインクリメントしtee()ます。最低の結果値を追跡します。
  6. -の最小の結果値を見つけますread_counter- これがゼロでない場合、ソース パイプからその量のデータを破棄します (たとえば、 でsplice()宛先を開いた呼び出しを使用します)。データを破棄した後、すべてのパイプから破棄され/dev/nullた量を引きます(これが最小値であるため、いずれかが負になることはありません)。read_counter
  7. 手順3から繰り返します。

注: 過去につまずいたことの 1 つは、パイプSPLICE_F_NONBLOCKに対するtee()andsplice()操作が非ブロッキングであるかどうかに影響し、O_NONBLOCKyou set withが他の呼び出し (たとえばand )fnctl()との相互作用が非ブロッキングであるかどうかに影響することです。すべてをノンブロッキングにする場合は、両方を設定します。また、ソケットをノンブロッキングにすることを忘れないでください。そうしないと、ソケットにデータを転送するための呼び出しがブロックされる可能性があります (スレッド化されたアプローチを使用している場合は、それが必要な場合を除きます)。read()write()splice()

ご覧のとおり、この戦略には大きな問題があります。1 つのソケットがブロックされるとすぐにすべてが停止します。そのソケットの宛先パイプがいっぱいになり、ソース パイプが停滞します。したがって、ステップ4tee()のreturnの段階に達したら、そのソケットを閉じるか、少なくとも「切断」(つまり、ループから外す) して、他に何も書き込まないようにする必要があります。出力バッファが空になるまで。どちらを選択するかは、データ ストリームがスキップされたビットから回復できるかどうかによって異なります。EAGAIN

ネットワークの遅延をより適切に処理したい場合は、より多くのバッファリングを行う必要があります。これには、ユーザー空間のバッファ (との利点を無効にする) またはディスクベースのバッファが含まれtee()ますsplice()。ディスクベースのバッファリングは、ほぼ確実にユーザー空間のバッファリングよりも大幅に遅くなるため、そもそも選択tee()してから多くの速度が必要になると思われるため、適切ではありませんsplice()が、完全を期すために言及します。

任意の時点でユーザー空間からデータを挿入することになった場合に注目すべきことの 1 つは、vmsplice()呼び出しと同様の方法で、ユーザー空間からパイプへの「出力の収集」を実行できるwritev()呼び出しです。これは、データを複数の異なる割り当てられたバッファーに分割するのに十分なバッファリングを行っている場合 (たとえば、プール アロケーター アプローチを使用している場合) に役立ちます。

最後に、ソケットを使用する「高速」スキームの間でソケットを交換し、それらが追いつかない場合は、より遅いユーザー空間のバッファリングに移動することを想像できtee()ますsplice()。これは実装を複雑にしますが、多数の接続を処理していて、それらのごく一部だけが遅い場合でも、多少関係するユーザー空間へのコピーの量を減らしています。ただし、これは一時的なネットワークの問題に対処するための短期的な手段に過ぎません。最初に述べたように、ソケットがソースよりも遅い場合、根本的な問題が発生します。最終的にバッファリングの制限に達し、データをスキップするか、接続を閉じる必要があります。

全体として、なぜ速度が必要なのか、そしてユースケースでは、メモリまたはディスク上の単純なユーザー空間バッファリングがより適切かどうかを慎重に検討しtee()ますsplice()。ただし、速度が常に高く、制限されたバッファリングが許容されると確信している場合は、上で概説したアプローチが機能するはずです.

また、言及しておくべきことの 1 つは、これによりコードが非常に Linux 固有のものになるということです。これらの呼び出しが他の Unix バリアントでサポートされていることは知りません。sendfile()呼び出しは よりも制限されていsplice()ますが、より移植性が高くなる可能性があります。本当に移植可能にしたい場合は、ユーザー空間のバッファリングに固執してください。

私が取り上げたことで、もっと詳しく知りたいことがあれば教えてください。

于 2013-01-07T17:28:02.910 に答える