4

最近、ユーザーが非同期/イベント駆動型プログラミングを行えるようにする優れた perl モジュール「AnyEvent」に出くわしました。

正常に動作する次のスニペットを作成しました。私が抱えている問題は、多くのソケットを開いたり閉じたりした後、すべてのクライアントポートをすぐに使い果たしてしまうことです (「netstat -ant」は、20,000 以上のソケットが TIME_WAIT 状態であることを示しています)。

$hdl = new AnyEvent::Handle (
  connect => [$ip, $port],
  on_connect=> sub {
      my ($handle, $host, $port, $tmp) = @_;
      #print "connect routine for $handle->{ue}\r\n";
      #update states.
  },
  on_read => sub {
      my $hdl = $_[0];
      #read data
      #send response.
  });

IO::Socket::INET で TCP ソケットを作成し、AnyEvent::Handle で新しく作成したソケットを使用することは可能でしょうか?

my $sock = IO::Socket::INET->new( Proto    => 'tcp',
                             PeerAddr => $ue->{vars}->{ip},
                             PeerPort => $ue->{vars}->{dstPort},
                             ReusePort => 1,
            KeepAlive => 1
) || die "failed to setup outsock $@\n";
$hdl = new AnyEvent::Handle (
  fh => $sock,
  on_connect=> sub {
      my ($handle, $host, $port, $tmp) = @_;
      #print "connect routine for $handle->{ue}\r\n";
      #update states.
  },
  on_read => sub {
      my $hdl = $_[0];
      #read data
      #send response.
  });

試してみましたが、うまくいきません。提案/コメントをお待ちしております。

それを調べて提案してくれた池上に感謝します。ただし、SO_REUSEADDR が有効になっていないようです。これが私が使用したコードです(彼の提案に基づく)

use strict;
use warnings;

use AnyEvent           qw( );
use AnyEvent::Handle   qw( );
use AnyEvent::Impl::EV qw( );
use AnyEvent::Socket   qw( tcp_connect );
use Socket             qw( SOL_SOCKET SO_REUSEPORT SO_REUSEADDR);

my $ts = 0;
my $trans = 0;
my $currentTS;

sub transaction {
   my ($host, $port) = @_;
   tcp_connect($host, $port, sub {
      my ($sock) = @_
         or die "Can't connect: $!";

      my $handle;
      $handle = AnyEvent::Handle->new(
         fh => $sock,
         on_eof => sub {
            $handle->destroy();
         },
         on_read => sub {
            my ($handle) = @_;
            #print $handle->rbuf();
            $trans ++;
            $currentTS = time();
            if ($currentTS > $ts) {
                $ts = $currentTS;
                print "$trans\n";
            }
            #printf "recved %d bytes of data\n", length($handle->rbuf);
            # This should continue to read until header +
            # Content-Length bytes have been read instead
            # of stopping after one read.
            if (length($handle->rbuf) > 0) {
                $handle->destroy();
            }
         },
      );
      $handle->push_write("GET /s HTTP/1.1\r\nHost: $host\r\n\r\n");
      #$handle->push_shutdown();  # Done writing.
   }, sub {
      my ($sock) = @_;

      #setsockopt($sock, SOL_SOCKET, SO_REUSEPORT, 1) or die $!;
      setsockopt($sock, SOL_SOCKET, SO_REUSEADDR, 1)  or die $!;
        #   die "failed to set linger $!\n";
      return undef;
   });
}
{
   my $cv = AnyEvent->condvar();

   my $t = AnyEvent->timer(after=>0.001, interval=>1, cb=> sub {
      transaction("10.3.0.6", 80 );
   });

   $cv->recv();
}

私のシステムはUbuntu 11.04です。ディレクトリ /proc/sys/net/ipv4 には、次の 2 つのファイルの内容があります。

% 以上の tcp_tw_recycle

1

% 以上の tcp_tw_reuse

1

4

2 に答える 2

5

Windows では が提供されていないため、次のコードを実行できませんが、次SO_REUSEPORTのコードが要求どおりに機能することを確信しています。

そうは言っても、それが役立つかどうかはわかりません。私が読んだSO_REUSEPORTことから、すでにアクティブなポートにバインドできますが、どのポートにもバインドしていません。

use strict;
use warnings;

use AnyEvent           qw( );
use AnyEvent::Handle   qw( );
use AnyEvent::Impl::EV qw( );
use AnyEvent::Socket   qw( tcp_connect );
use Socket             qw( SOL_SOCKET SO_REUSEPORT );

sub transaction {
   my ($host, $port) = @_;
   tcp_connect($host, $port, sub {
      my ($sock) = @_
         or die "Can't connect: $!";

      my $handle;
      $handle = AnyEvent::Handle->new(
         fh => $sock,
         on_eof => sub {
            $handle->destroy();
         },
         on_read => sub {
            my ($handle) = @_;
            print $handle->rbuf();

            # This should continue to read until header +
            # Content-Length bytes have been read instead
            # of stopping after one read.
            $handle->destroy();
         },
      );

      $handle->push_write("GET / HTTP/1.1\r\nHost: $host\r\n\r\n");
      $handle->push_shutdown();  # Done writing.
   }, sub {
      my ($sock) = @_;

      setsockopt($sock, SOL_SOCKET, SO_REUSEPORT, 1)
         or die $!;

      return undef;
   });
}

{
   my $cv = AnyEvent->condvar();

   my $t = AnyEvent->timer(after=>0.001, interval=>0.001, cb=> sub {
      transaction("localhost", $ARGV[0] // die("usage"));
   });

   $cv->recv();
}

テストに使用したサーバー:

use strict;
use warnings;
use 5.010;

use IO::Socket::INET qw( );
use Socket           qw( inet_ntoa );

my $serv = IO::Socket::INET->new(
   Listen => 1,
);

say inet_ntoa($serv->sockaddr) . ":" . $serv->sockport;

while (my $client = $serv->accept()) {
   say "Connection from ".inet_ntoa($client->peeraddr).":".$client->peerport;
   while (<$client>) {
      last if /^(?:\r?\n)?\z/;
   }

   say $client "HTTP/1.1 200 OK\r\n"
      .        "Content-Type: text/plain\r\n"
      .        "\r\n"
      .        "Hello\n";

   say "   done.";
}
于 2012-11-12T10:19:51.653 に答える