DataStream フローの計算結果を HTTP プロトコルで他のサービスに送信したい。それを実装する方法として、次の 2 つの方法が考えられます。
- シンクで同期 Apache HttpClient クライアントを使用する
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
private static final String URL = "http://httpbin.org/post";
private CloseableHttpClient httpClient;
private Histogram httpStatusesAccumulator;
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpClients.custom()
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
}
@Override
public void close() throws Exception {
httpClient.close();
httpStatusesAccumulator.resetLocal();
}
@Override
public void invoke(SessionItem value) throws Exception {
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
HttpPost httpPost = new HttpPost(URL);
httpPost.setEntity(entity);
try(CloseableHttpResponse response = httpClient.execute(httpPost)) {
int httpStatusCode = response.getStatusLine().getStatusCode();
httpStatusesAccumulator.add(httpStatusCode);
}
}
}
- シンクで非同期 Apache HttpAsyncClient クライアントを使用する
public class AsyncHttpSink extends RichSinkFunction<SessionItem> {
private static final String URL = "http://httpbin.org/post";
private CloseableHttpAsyncClient httpClient;
private Histogram httpStatusesAccumulator;
@Override
public void open(Configuration parameters) throws Exception {
httpClient = HttpAsyncClients.custom()
.setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
.build();
httpClient.start();
httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
}
@Override
public void close() throws Exception {
httpClient.close();
httpStatusesAccumulator.resetLocal();
}
@Override
public void invoke(SessionItem value) throws Exception {
List<NameValuePair> params = new ArrayList<>();
params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));
UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);
HttpPost httpPost = new HttpPost(URL);
httpPost.setEntity(entity);
httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
@Override
public void completed(HttpResponse response) {
int httpStatusCode = response.getStatusLine().getStatusCode();
httpStatusesAccumulator.add(httpStatusCode);
}
@Override
public void failed(Exception ex) {
httpStatusesAccumulator.add(-1); // -1 - failed
}
@Override
public void cancelled() {
httpStatusesAccumulator.add(-2); // -2 - cancelled
}
});
}
}
質問:
シンクで同期または非同期の HTTP クライアントを使用する必要がありますか?
同期クライアントを使用する場合、それはシンクをブロックし、バック プレッシャーによって Flink はソースをブロックします。右?
非同期クライアントを使用する場合、シンクはブロックされません。右?
アキュムレータはスレッドセーフではありませんか? つまり、非同期コールバックで使用できますか?
RuntimeContext はスレッドセーフではありませんか? つまり、非同期コールバックで使用できますか?