9

編集

どうやら私がやりたかったのは倹約の範囲外です...ポートに複数のクライアントが存在しないことを確認すれば、すべて問題ありません。もちろん、応答時間を改善し、オーバーヘッドを減らすために、サーバーに対していくつかの再利用可能な接続を開いてもらいたいので、この種の目的は無効になります。

誰かがこれを達成するための別の方法の提案を持っているなら、それはありがたいです(または私の結論が間違っている場合)

バックグラウンド

私は、ほとんどがthrift(主にjava-> php接続)によって接続されているマルチコンポーネントアプリケーションを持っています。

これまでのところ、すべて問題ないように見えますが、クライアント側が1秒間に数百の要求を起動できるサーブレットであるJava->Java接続を導入しました。

アクセスされているメソッドには、次のインターフェースがあります。

bool pvCheck(1:i32 toolId) throws(1:DPNoToolException nte),

サービス側で奇妙なことではないことを確認するために、実装を簡単なものに置き換えました。

    @Override
    public boolean pvCheck(int toolId) throws TException {
        //boolean ret = api.getViewsAndDec(toolId);
        return true;
    }

エラー/考えられる原因は?

接続が少ない限り問題なく動作しますが、接続が近づくとすぐに、接続がリーダーでスタックし始めます。

それらの1つをデバッガーでプルアップすると、スタックは次のようになります。

Daemon Thread [http-8080-197] (Suspended)   
    BufferedInputStream.read(byte[], int, int) line: 308    
    TSocket(TIOStreamTransport).read(byte[], int, int) line: 126    
    TSocket(TTransport).readAll(byte[], int, int) line: 84  
    TBinaryProtocol.readAll(byte[], int, int) line: 314 
    TBinaryProtocol.readI32() line: 262 
    TBinaryProtocol.readMessageBegin() line: 192    
    DumboPayment$Client.recv_pvCheck() line: 120    
    DumboPayment$Client.pvCheck(int) line: 105  
    Receiver.performTask(HttpServletRequest, HttpServletResponse) line: 157 
    Receiver.doGet(HttpServletRequest, HttpServletResponse) line: 109   
    Receiver(HttpServlet).service(HttpServletRequest, HttpServletResponse) line: 617    
    Receiver(HttpServlet).service(ServletRequest, ServletResponse) line: 717    
    ApplicationFilterChain.internalDoFilter(ServletRequest, ServletResponse) line: 290  
    ApplicationFilterChain.doFilter(ServletRequest, ServletResponse) line: 206  
    StandardWrapperValve.invoke(Request, Response) line: 233    
    StandardContextValve.invoke(Request, Response) line: 191    
    StandardHostValve.invoke(Request, Response) line: 127   
    ErrorReportValve.invoke(Request, Response) line: 102    
    StandardEngineValve.invoke(Request, Response) line: 109 
    CoyoteAdapter.service(Request, Response) line: 298  
    Http11AprProcessor.process(long) line: 859  
    Http11AprProtocol$Http11ConnectionHandler.process(long) line: 579   
    AprEndpoint$Worker.run() line: 1555 
    Thread.run() line: 619  

これは、次の例外が発生したため、データが破損したことが原因のようです。

10/11/22 18:38:55 WARN logger.Receiver: pvCheck had an exception
org.apache.thrift.TApplicationException: pvCheck failed: unknown result
    at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:135)
    at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
    at *.Receiver.performTask(Receiver.java:157)
    at *.Receiver.doGet(Receiver.java:109)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:617)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:717)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:233)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:298)
    at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:859)
    at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:579)
    at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:1555)
    at java.lang.Thread.run(Thread.java:619)

10/11/22 17:59:46 ERROR [/ninja_ar].[Receiver]: サーブレット Receiver のServlet.service()が例外を投げました
java.lang.OutOfMemoryError: Java heap space
    at org.apache.thrift.protocol.TBinaryProtocol.readStringBody(TBinaryProtocol.java:296)
    at org.apache.thrift.protocol.TBinaryProtocol.readString(TBinaryProtocol.java:290)
    at org.apache.thrift.protocol.TBinaryProtocol.readMessageBegin(TBinaryProtocol.java:198)
    at *.thrift.generated.DumboPayment$Client.recv_pvCheck(DumboPayment.java:120)
    at *.thrift.generated.DumboPayment$Client.pvCheck(DumboPayment.java:105)
    at *.Receiver.performTask(Receiver.java:157)
    at *.Receiver.doGet(Receiver.java:109)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:690)
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:803)
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:269)
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:188)
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:210)
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:172)
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:117)
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:108)
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:151)
    at org.apache.coyote.http11.Http11Processor.process(Http11Processor.java:870)
    at org.apache.coyote.http11.Http11BaseProtocol$Http11ConnectionHandler.processConnection(Http11BaseProtocol.java:665)
    at org.apache.tomcat.util.net.PoolTcpEndpoint.processSocket(PoolTcpEndpoint.java:528)
    at org.apache.tomcat.util.net.LeaderFollowerWorkerThread.runIt(LeaderFollowerWorkerThread.java:81)
    at org.apache.tomcat.util.threads.ThreadPool$ControlRunnable.run(ThreadPool.java:685)
    at java.lang.Thread.run(Thread.java:636)

おそらく私はマークから外れていますが、これらは、何も送信されていないときにクライアントが読み続けようとしていることに関連していると確信しています。

いくつかの実装の詳細

サーバーとクライアントの両方がJavaバイナリプロトコルを使用しています。

クライアントを再利用できる単純なクライアントプールクラスを作成しました。これらが主な機能です。

public synchronized Client getClient() {
    if(clientQueue.isEmpty()) {
        return newClient();
    } else {
        return clientQueue.getLast();
    }
}

private synchronized Client newClient() {
    int leftToTry = serverArr.length;
    Client cli = null;
    while(leftToTry > 0 && cli == null) {
        log.info("Creating new connection to " + 
                serverArr[roundRobinPos] + port);
        TTransport transport = new TSocket(serverArr[roundRobinPos], port);
        TProtocol protocol = new TBinaryProtocol(transport);
        cli = new Client(protocol);
        try {
            transport.open();
        } catch (TTransportException e) {
            cli = null;
            log.warn("Failed connection to " + 
                    serverArr[roundRobinPos] + port);
        }

        roundRobinPos++;
        if(roundRobinPos >= serverArr.length) {
            roundRobinPos = 0;
        }
        leftToTry--;
    }

    return cli;
}

public void returnClient(Client cli) {
    clientQueue.addFirst(cli);
}

クライアントアプリケーション(つまり、Tomcatサーブレット)は、次の方法でアクセスします。

    Client dpayClient = null;
    if(dpay != null
            && (dpayClient = dpay.getClient()) != null) {

        try {
            dpayClient.pvCheck(requestParameters.getId());
        } catch (DPNoToolException e) {
            return;
        } catch (TException e) {
            log.warn("pvCheck had an exception", e);
        } finally {
            if(dpayClient != null) {
                dpay.returnClient(dpayClient);
            }
        }
    }

実際の倹約接続は次のようにアップされます

private boolean initThrift(int port, Configuration conf) {
    TProtocolFactory protocolFactory = new TBinaryProtocol.Factory();
    DPaymentHandler handler = new DPaymentHandler(conf);

    DumboPayment.Processor processor = 
        new DumboPayment.Processor(handler);

    InetAddress listenAddress;
    try {
        listenAddress = InetAddress.getLocalHost();
    } catch (UnknownHostException e) {
        LOG.error("Failed in thrift init", e);
        return false;
    }
    TServerTransport serverTransport;
    try {
        serverTransport = new TServerSocket(
                new InetSocketAddress(listenAddress, port));
    } catch (TTransportException e) {
        LOG.error("Failed in thrift init", e);
        return false;
    }

    TTransportFactory transportFactory = new TTransportFactory();
    TServer server = new TThreadPoolServer(processor, serverTransport,
            transportFactory, protocolFactory);

    LOG.info("Starting Dumbo Payment thrift server on " + 
            listenAddress + ":" + Integer.toString(port));
    server.serve();

    return true;
}

ついに

しばらくこれにこだわっています...それは私が明白な何かを逃しているかもしれません。私はこれでどんな助けにも本当に感謝します。

追加情報が必要な場合は、お知らせください。そこには一口がたくさんあるので、私はものを最も(うまくいけば)関連性のあるものに保つようにしたかったのです。

4

3 に答える 3

11

私の推測では、複数のスレッドが同時にクライアントを使用しようとしていると思いますが、その防弾性については完全にはわかりません。非同期インターフェースを使用したり、クライアントにアクセスするためのスレッドセーフなリソースプールを構築したりすることもできます。

Thrift-0.5.0.0を使用して、thriftで生成されたコード用のAsyncClientを作成する例を次に示します。

Factory fac = new AsyncClient.Factory(new TAsyncClientManager(), new TProtocolFactory() {
    @Override
    public TProtocol getProtocol( TTransport trans ) {
        return new TBinaryProtocol(trans);
    }
});
AsyncClient cl = fac.getAsyncClient( new TNonblockingSocket( "127.0.0.1", 12345 ));

ただし、ソースを調べると、NIOソケットを使用している場合でも、シングルスレッドメッセージハンドラーがあることがわかります。これがボトルネックである可能性があります。より多くを取得するには、より多くの非同期クライアントを作成し、それらをチェックアウトして、正しく返す必要があります。

これを単純化するために、私はそれらを管理するための簡単な小さなクラスを作成しました。ニーズに合わせて変更するために必要なのは、パッケージを含めることだけです。これは、実際にはあまりテストしていませんが(まったく)、機能するはずです。

public class Thrift {

    // This is the request
    private static abstract class ThriftRequest {

        private void go( final Thrift thrift, final AsyncClient cli ) {
            on( cli );
            thrift.ret( cli );
        }

        public abstract void on( AsyncClient cli );
    }

    // Holds all of our Async Clients
    private final ConcurrentLinkedQueue<AsyncClient>   instances = new ConcurrentLinkedQueue<AsyncClient>();
    // Holds all of our postponed requests
    private final ConcurrentLinkedQueue<ThriftRequest> requests  = new ConcurrentLinkedQueue<ThriftRequest>();
    // Holds our executor, if any
    private Executor                                 exe       = null;

    /**
     * This factory runs in thread bounce mode, meaning that if you call it from 
     * many threads, execution bounces between calling threads depending on when        
     * execution is needed.
     */
    public Thrift(
            final int clients,
            final int clients_per_message_processing_thread,
            final String host,
            final int port ) throws IOException {

        // We only need one protocol factory
        TProtocolFactory proto_fac = new TProtocolFactory() {

            @Override
            public TProtocol getProtocol( final TTransport trans ) {
                return new TBinaryProtocol( trans );
            }
        };

        // Create our clients
        Factory fac = null;
        for ( int i = 0; i < clients; i++ ) {

            if ( fac == null || i % clients_per_message_processing_thread == 0 ) {
                fac = new AsyncClient.Factory(
                    new TAsyncClientManager(),
                    proto_fac );
            }

            instances.add( fac.getAsyncClient( new TNonblockingSocket(
                host,
                port ) ) );
        }
    }
    /**
     * This factory runs callbacks in whatever mode the executor is setup for,
     * not on calling threads.
     */
    public Thrift( Executor exe,
            final int clients,
            final int clients_per_message_processing_thread,
            final String host,
            final int port ) throws IOException {
        this( clients, clients_per_message_processing_thread, host, port );
        this.exe = exe;
    }

    // Call this to grab an instance
    public void
            req( final ThriftRequest req ) {
        final AsyncClient cli;
        synchronized ( instances ) {
            cli = instances.poll();
        }
        if ( cli != null ) {
            if ( exe != null ) {
                // Executor mode
                exe.execute( new Runnable() {

                    @Override
                    public void run() {
                        req.go( Thrift.this, cli );
                    }

                } );
            } else {
                // Thread bounce mode
                req.go( this, cli );
            }
            return;
        }
        // No clients immediately available
        requests.add( req );
    }

    private void ret( final AsyncClient cli ) {
        final ThriftRequest req;
        synchronized ( requests ) {
            req = requests.poll();
        }
        if ( req != null ) {
            if ( exe != null ) {
                // Executor mode
                exe.execute( new Runnable() {

                    @Override
                    public void run() {
                        req.go( Thrift.this, cli );
                    }
                } );
            } else {
                // Thread bounce mode
                req.go( this, cli );
            }
            return;
        }
        // We did not need this immediately, hold onto it
        instances.add( cli );

    }

}

使用方法の例:

// Make the pool
Thrift t = new Thrift( 10, "localhost", 8000 );
// Use the pool
t.req( new ThriftRequest() {

    @Override
    public void on( AsyncClient cli ) {
        cli.MyThriftMethod( "stringarg", 111, new AsyncMethodCallback<AsyncClient.MyThriftMethod_call>() {
            @Override
            public void onError( Throwable throwable ) {
                }

            @Override
            public void onComplete( MyThriftMethod_call response ) {
            }
        });
    }
} );

THsHaServerなどのさまざまなサーバーモードを試して、ご使用の環境に最適なものを確認することをお勧めします。

于 2011-01-20T03:55:47.553 に答える
0

次の行でTSocketではなくTHttpClientを使用してみてください。

TTransport transport = new TSocket(serverArr[roundRobinPos], port);
于 2013-04-19T01:27:27.253 に答える
-1

ClientPoolの関数returnClientはスレッドセーフではありません:

public void returnClient(Client cli) {
    clientQueue.addFirst(cli);
}
于 2015-05-10T14:44:18.540 に答える