3

AnyEvent::Handleと を活用tcp_connectして記述した単純な TCP サーバーとクライアントがありますtcp_server。クライアントはサーバーに接続し、Test Message5 秒ごとに文字列を送信します。

これは、サーバーに到達できる場合は問題なく機能しますが、クライアントの起動時にサーバーが利用できない場合、または利用できなくなった場合、クライアント スクリプトは再接続を試みません。

接続ハンドルが利用できない場合 (破棄された場合) は、再接続を試みてください。利用できない場合は、何かを行います (おそらくステータス メッセージを出力します) が、理想的な結果は 5 秒ごとに再接続することです。

どうすればいいのかわかりません。クライアントとサーバーのコードを次のように簡素化しました。

クライアント

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my @bulk;

# Start Timer
my $timer = AnyEvent->timer(
    after    => 5,
    interval => 5,
    cb       => sub {
        push( @bulk, "Test message" );
        flush( \@bulk );
        undef @bulk;
    } );

my $host = '127.0.0.1';
my $port = 9999;

my $conn_cv = AnyEvent->condvar;
my $conn_hdl;

$conn_hdl = AnyEvent::Handle->new(
    connect          => [$host, $port],
    keepalive        => 1,
    on_connect_error => sub {
        print "Could not connect: $_[1]\n";
        $conn_hdl->destroy;

        #$conn_cv->send;
    },
    on_error => sub {
        my ( $out_hdl, $fatal, $msg ) = @_;
        AE::log error => $msg;
        $conn_hdl->destroy;

        #$conn_cv->send;
    },
    on_read => sub {
        my ( $self ) = @_;
        $self->unshift_read(
            line => sub {
                my ( $hdl, $data ) = @_;
                print $data. "\n";
            } );
    } );

$conn_cv->recv;

# Flush array of events
sub flush {
    my ( $bulk ) = @_;
    return 0 if scalar @{$bulk} == 0;

    my $output = join( ",", @{$bulk} );
    $output = compress( $output );
    my $l = pack( "N", length( $output ) );
    $output = $l . $output;
    $conn_hdl->push_write( $output );
}

サーバ

#!/usr/bin/perl

use strict;
use warnings;

use AnyEvent;
use AnyEvent::Handle;
use AnyEvent::Socket;
use Compress::Zlib;

my %holding;

my $host = '127.0.0.1';
my $port = 9999;

my %connections;

# Start Timer
my $timer = AnyEvent->timer(
    after    => 5,
    interval => 5,
    cb       => sub {
        print "Number of connected hosts: ";
        print scalar keys %connections;
        print "\n";
        foreach my $k ( keys %connections ) {
            delete $connections{$k} if $connections{$k}->destroyed;
        }
    } );

my $server_cv = AnyEvent->condvar;
my $server    = tcp_server(
    $host, $port,
    sub {
        my ( $fh, $h, $p ) = @_;
        my $handle;

        $handle = AnyEvent::Handle->new(
            fh        => $fh,
            poll      => 'r',
            keepalive => 1,
            on_read   => sub {
                my ( $self ) = @_;

                # Get Length Header
                $self->unshift_read(
                    chunk => 4,
                    sub {
                        my $len = unpack( "N", $_[1] );

                        # Get Data
                        $self->unshift_read(
                            chunk => $len,
                            sub {
                                my $data = $_[1];
                                $data = uncompress( $data );
                                print $data. "\n";
                            } );
                    } );

            },
            on_eof => sub {
                my ( $hdl ) = @_;
                $hdl->destroy();
            },
            on_error => sub {
                my ( $hdl ) = @_;
                $hdl->destroy();
            },
        );

        $connections{ $h . ':' . $p } = $handle;    # keep it alive.
    } );

$server_cv->recv;
4

2 に答える 2

1

以下を使用できます。

package MyConnector;

use strict;
use warnings;

use AE               qw( );
use AnyEvent::Handle qw( );
use Scalar::Util     qw( );

sub new {
   my $class = shift;
   my %opts = @_;

   my $self = bless({}, $class);

   {
      Scalar::Util::weaken(my $self = $self);

      my $on_connect       = delete($opts{on_connect});
      my $on_connect_error = delete($opts{on_connect_error});

      my $tries    = delete($opts{tries})    ||  5;
      my $cooldown = delete($opts{cooldown}) || 15;

      $self->{_connect} = sub {
         $self->{_timer} = undef;

         $self->{_handle} = AnyEvent::Handle->new(
            %opts,

            on_connect => sub {
               my ($handle, $host, $port, $retry) = @_;

               $self->{handle} = $handle;
               delete @{$self}{qw( _connect _handle _timer )};

               $on_connect->($handle, $host, $port, $retry)
                  if $on_connect;
            },

            on_connect_error => sub {
               my ($handle, $message) = @_;

               if (!$tries--) {
                  $on_connect_error->($handle, $message)
                     if $on_connect_error;

                  delete @{$self}{qw( _connect _handle _timer )};

                  return;
               }

               # This will happen when this callback returns,
               # but that might not be for a while, so let's
               # do it now in case it saves resources.
               $handle->destroy();

               $self->{_timer} = AE::timer($cooldown, 0, $self->{_connect});
            },
         );
      };

      $self->{_connect}->();
   }

   return $self;
}

sub handle {
   my ($self) = @_;
   return $self->{handle};
}

1;

(あなたのコードとは異なり)メモリリークがないことは間違いありません。次のように使用します。

use strict;
use warnings;

use AE          qw( );
use MyConnector qw( );

my $host = $ARGV[0] || 'www.stackoverflow.com';
my $port = $ARGV[1] || 80;

my $conn_cv = AE::cv();

my $connector = MyConnector->new(
   connect   => [ $host, $port ],
   keepalive => 1,

   on_connect => sub {
       print("Connected successfully\n");
       $conn_cv->send();
   },

   on_connect_error => sub {
       warn("Could not connect: $_[1]\n");
       $conn_cv->send();
   },

   # ...
);

$conn_cv->recv();
于 2015-06-22T05:30:51.573 に答える
0

ikegami のおかげで、接続ステータスを追跡し、それを別の AnyEvent タイマー ウォッチャーと組み合わせて使用​​し、接続が確立されていない場合に再接続することを考えました。これにより、接続ステータス ($isConnected) がゼロの場合、毎秒接続が試行されます。その間、イベントは接続が再確立されたときのためにキューに入れられます。

これを達成するためのより簡単な方法があれば、私はすべて耳にしますが、今のところ、これで問題が解決すると思います。

my @bulk;
my $host = '127.0.0.1';
my $port = 9999;
my $isConnected = 0;

my $conn_cv = AnyEvent->condvar;
my $conn_hdl;

# Flush Timer
my $timer = AnyEvent->timer(
    after => 5,
    interval => 5,
    cb => sub {
        push(@bulk,"Test message");
        if ($isConnected == 1) {
            flush(\@bulk);
            undef @bulk;
        }   
    }
);

# Reconnect Timer
my $reconn = AnyEvent->timer(
    after => 1,
    interval => 1,
    cb => sub {

        if ($isConnected == 0) {

            $conn_hdl = AnyEvent::Handle->new(
                connect => [$host, $port],
                keepalive => 1,
                on_connect => sub {
                    $isConnected = 1;
                },
                on_connect_error => sub {
                    warn "Could not connect: $_[1]\n";
                    $conn_hdl->destroy;
                    $isConnected = 0;       
                },
                on_error => sub {
                    my ($out_hdl, $fatal, $msg) = @_;
                    AE::log error => $msg;
                    $conn_hdl->destroy;
                    $isConnected = 0;
                },
                on_eof => sub {
                    warn "EOF\n";
                    $conn_hdl->destroy;
                    $isConnected = 0;
                },
                on_read => sub {
                    my ($self) = @_;
                        $self->unshift_read(line => sub {
                            my ($hdl,$data) = @_;
                            print $data."\n";
                    });
                }
            );  
        }
    }
);

$conn_cv->recv;
于 2015-06-22T15:59:34.010 に答える