2

Apache HTTPClient 4を使用して、デフォルトレベルのアクセスでTwitterのストリーミングAPIに接続しています。最初は完全に機能しますが、データを取得して数分後、次のエラーが発生します。

2012-03-28 16:17:00,040 DEBUG org.apache.http.impl.conn.SingleClientConnManager: Get connection for route HttpRoute[{tls}->http://myproxy:80->https://stream.twitter.com:443]
2012-03-28 16:17:00,040 WARN com.cloudera.flume.core.connector.DirectDriver: Exception in source: TestTwitterSource
java.lang.IllegalStateException: Invalid use of SingleClientConnManager: connection still allocated.
    at org.apache.http.impl.conn.SingleClientConnManager.getConnection(SingleClientConnManager.java:216)
Make sure to release the connection before allocating another one.
    at org.apache.http.impl.conn.SingleClientConnManager$1.getConnection(SingleClientConnManager.java:190)

私はこの問題に直面している理由を理解しています。このHttpClientを水路クラスターで水路ソースとして使用しようとしています。コードは次のようになります。

public Event next() throws IOException, InterruptedException {

    try {

        HttpHost target = new HttpHost("stream.twitter.com", 443, "https");
        new BasicHttpContext();
        HttpPost httpPost = new HttpPost("/1/statuses/filter.json");
        StringEntity postEntity = new StringEntity("track=birthday",
                "UTF-8");
        postEntity.setContentType("application/x-www-form-urlencoded");
        httpPost.setEntity(postEntity);
        HttpResponse response = httpClient.execute(target, httpPost,
                new BasicHttpContext());
        BufferedReader reader = new BufferedReader(new InputStreamReader(
                response.getEntity().getContent()));
        String line = null;
        StringBuffer buffer = new StringBuffer();
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>30000) break;
        }
        return new EventImpl(buffer.toString().getBytes());
    } catch (IOException ie) {
        throw ie;
    }

}

応答ストリーム内の30,000文字をStringBufferにバッファリングし、これを受信したデータとして返そうとしています。私は明らかに接続を閉じていませんが、まだ接続を閉じたくないと思います。Twitterの開発ガイドはこれについてここで話しますそれは読みます:

一部のHTTPクライアントライブラリは、サーバーによって接続が閉じられた後にのみ応答本文を返します。これらのクライアントは、ストリーミングAPIにアクセスするためには機能しません。応答データを段階的に返すHTTPクライアントを使用する必要があります。最も堅牢なHTTPクライアントライブラリがこの機能を提供します。たとえば、ApacheHttpClientはこのユースケースを処理します。

HttpClientが応答データを段階的に返すことを明確に示しています。例とチュートリアルを確認しましたが、これに近いものは見つかりませんでした。httpclient(apacheでない場合)を使用して、TwitterのストリーミングAPIを段階的に読んだことがある場合は、この偉業をどのように達成したかをお知らせください。まだお持ちでない方は、お気軽にご回答ください。TIA。

アップデート

これを試してみました:1)ストリームハンドルの取得を水路ソースのopenメソッドに移動しました。2)単純な入力ストリームを使用し、データをバイトバッファに読み込みます。メソッド本体は次のようになります。

        byte[] buffer = new byte[30000];

        while (true) {
            int count = instream.read(buffer);
            if (count == -1)
                continue;
            else
                break;
        }
        return new EventImpl(buffer);

これはある程度機能します-私はツイートを受け取ります、それらは目的地にうまく書かれています。問題は、instream.read(buffer)の戻り値にあります。ストリームにデータがなく、バッファにデフォルトの\ u0000バイトと30,000バイトがある場合でも、この値は宛先に書き込まれます。したがって、宛先ファイルは次のようになります。 "tweets..tweets..tweeets .. \ u0000 \ u0000 \ u0000 \ u0000 \ u0000 \ u0000 \ u0000 ...tweets..tweets..."。カウントが-1cozを返さないことを理解しています。これは終わりのないストリームです。したがって、読み取りコマンドからバッファーに新しいコンテンツがあるかどうかを確認するにはどうすればよいですか。

4

2 に答える 2

0

それは水路の問題だったことがわかりました。Flumeは、サイズ32kbのイベントを転送するように最適化されています。32kbを超えるものはすべて、Flumeがベイルアウトします。(回避策は、イベントサイズを32KBより大きくなるように調整することです)。そのため、少なくとも20,000文字をバッファリングするようにコードを変更しました。それは一種の作品ですが、絶対確実ではありません。バッファ長が32kbを超える場合でもこれは失敗する可能性がありますが、1時間のテストでは失敗していません。これは、Twitterがパブリックストリームで大量のデータを送信しないという事実に関係していると思います。

while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if(buffer.length()>20000) break;
        }
于 2012-04-01T13:25:48.787 に答える
0

問題は、コードが接続をリークしていることです。コンテンツストリームを閉じるか、リクエストを中止するかに関係なく、必ず確認してください。

    InputStream instream = response.getEntity().getContent();
    try {
        BufferedReader reader = new BufferedReader(
               new InputStreamReader(instream));
        String line = null;
        StringBuffer buffer = new StringBuffer();
        while ((line = reader.readLine()) != null) {
            buffer.append(line);
            if (buffer.length()>30000) {
               httpPost.abort();
               // connection will not be re-used
               break;
            }
        }
        return new EventImpl(buffer.toString().getBytes());
    } finally {
        // if request is not aborted the connection can be re-used
        try {
          instream.close();
        } catch (IOException ex) {
          // log or ignore
        }
    }
于 2012-03-28T19:56:19.693 に答える