これが私の DataClientFactory クラスです。
public class DataClientFactory {
public static IClient getInstance() {
return ClientHolder.INSTANCE;
}
private static class ClientHolder {
private static final DataClient INSTANCE = new DataClient();
static {
new DataScheduler().startScheduleTask();
}
}
}
これが私の DataClient クラスです。
public class DataClient implements IClient {
private ExecutorService service = Executors.newFixedThreadPool(15);
private RestTemplate restTemplate = new RestTemplate();
// for initialization purpose
public DataClient() {
try {
new DataScheduler().callDataService();
} catch (Exception ex) { // swallow the exception
// log exception
}
}
@Override
public DataResponse getDataSync(DataKey dataKeys) {
DataResponse response = null;
try {
Future<DataResponse> handle = getDataAsync(dataKeys);
response = handle.get(dataKeys.getTimeout(), TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
// log error
response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
} catch (Exception e) {
// log error
response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
}
return response;
}
@Override
public Future<DataResponse> getDataAsync(DataKey dataKeys) {
Future<DataResponse> future = null;
try {
DataTask dataTask = new DataTask(dataKeys, restTemplate);
future = service.submit(dataTask);
} catch (Exception ex) {
// log error
}
return future;
}
}
以下に示すように、上記のファクトリからクライアント インスタンスを取得し、オブジェクトgetDataSyncを渡してメソッドを呼び出しDataKeyます。DataKey オブジェクトにはuserIdと のTimeout値があります。この後、呼び出しが呼び出されるとすぐに、呼び出しが私のDataTaskクラスに行きます。callhandle.get
IClient dataClient = DataClientFactory.getInstance();
long userid = 1234l;
long timeout_ms = 500;
DataKey keys = new DataKey.Builder().setUserId(userid).setTimeout(timeout_ms)
.remoteFlag(false).secondaryFlag(true).build();
// call getDataSync method
DataResponse dataResponse = dataClient.getDataSync(keys);
System.out.println(dataResponse);
ここにすべてのロジックを持つ私の DataTask クラスがあります -
public class DataTask implements Callable<DataResponse> {
private DataKey dataKeys;
private RestTemplate restTemplate;
public DataTask(DataKey dataKeys, RestTemplate restTemplate) {
this.restTemplate = restTemplate;
this.dataKeys = dataKeys;
}
@Override
public DataResponse call() {
DataResponse dataResponse = null;
ResponseEntity<String> response = null;
int serialId = getSerialIdFromUserId();
boolean remoteFlag = dataKeys.isRemoteFlag();
boolean secondaryFlag = dataKeys.isSecondaryFlag();
List<String> hostnames = new LinkedList<String>();
Mappings mappings = ClientData.getMappings(dataKeys.whichFlow());
String localPrimaryAdress = null;
String remotePrimaryAdress = null;
String localSecondaryAdress = null;
String remoteSecondaryAdress = null;
// use mappings object to get above Address by using serialId and basis on
// remoteFlag and secondaryFlag populate the hostnames linked list
if (remoteFlag && secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
hostnames.add(localSecondaryHostIPAdress);
hostnames.add(remotePrimaryHostIPAdress);
hostnames.add(remoteSecondaryHostIPAdress);
} else if (remoteFlag && !secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
hostnames.add(remotePrimaryHostIPAdress);
} else if (!remoteFlag && !secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
} else if (!remoteFlag && secondaryFlag) {
hostnames.add(localPrimaryHostIPAdress);
hostnames.add(localSecondaryHostIPAdress);
}
for (String hostname : hostnames) {
// If host name is null or host name is in local block host list, skip sending request to this host
if (hostname == null || ClientData.isHostBlocked(hostname)) {
continue;
}
try {
String url = generateURL(hostname);
response = restTemplate.exchange(url, HttpMethod.GET, dataKeys.getEntity(), String.class);
// make DataResponse
break;
} catch (HttpClientErrorException ex) {
// make DataResponse
return dataResponse;
} catch (HttpServerErrorException ex) {
// make DataResponse
return dataResponse;
} catch (RestClientException ex) {
// If it comes here, then it means some of the servers are down.
// Add this server to block host list
ClientData.blockHost(hostname);
// log an error
} catch (Exception ex) {
// If it comes here, then it means some weird things has happened.
// log an error
// make DataResponse
}
}
return dataResponse;
}
private String generateURL(final String hostIPAdress) {
// make an url
}
private int getSerialIdFromUserId() {
// get the id
}
}
に基づいてuserId、 を取得してserialIdからホスト名のリストを取得します。渡されたフラグに応じて呼び出しを行うことを想定しています。次に、リストを繰り返し処理hostnamesし、サーバーを呼び出します。たとえば、リンクされたリストに 4 つのホスト名 (A、B、C、D) がある場合、最初に A を呼び出し、データが返されたら DataResponse を返します。しかし、A がダウンしている場合、他のスレッドが A のホスト名を呼び出せないように、すぐに A をブロック リストに追加する必要があるとします。次に、ホスト名 B に呼び出しを行い、データを取得して応答を返します (または、B もダウンしている場合は同じことを繰り返します)。
10 分ごとに実行されるバックグラウンド スレッドもあり、ファクトリからクライアント インスタンスを取得するとすぐに開始され、別のサービス URL を解析して、呼び出しを行うべきではないブロック ホスト名のリストを取得します。10分ごとに実行されるため、ダウンしているサーバーは10分後にのみリストを取得します。一般に、Aがダウンしている場合、私のサービスはホスト名のブロックリストとしてAを提供し、Aがアップするとすぐに提供します、そのリストも 10 分後に更新されます。
これが私のバックグラウンド スレッド コード DataScheduler です。
public class DataScheduler {
private RestTemplate restTemplate = new RestTemplate();
private static final Gson gson = new Gson();
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public void startScheduleTask() {
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
try {
callDataService();
} catch (Exception ex) {
// log an error
}
}
}, 0, 10L, TimeUnit.MINUTES);
}
public void callDataService() throws Exception {
String url = null;
// execute the url and get the responseMap from it as a string
parseResponse(responseMap);
}
private void parseResponse(Map<FlowsEnum, String> responses) throws Exception {
// .. some code here to calculate partitionMappings
// block list of hostnames
Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
for (Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
for (String hosts : entry.getValue()) {
blockList.add(hosts);
}
}
if (update) {
ClientData.setAllMappings(partitionMappings);
}
// update the block list of hostnames
if (!DataUtils.isEmpty(responses)) {
ClientData.replaceBlockedHosts(blockList);
}
}
}
そして、ホスト名のブロック リストと partitionMappings の詳細 (有効なホスト名のリストを取得するために使用される) のすべての情報を保持する ClientData クラスを次に示します。
public class ClientData {
private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = new AtomicReference<ConcurrentHashMap<String, String>>(
new ConcurrentHashMap<String, String>());
// some code here to set the partitionMappings by using CountDownLatch
// so that read is blocked for first time reads
public static boolean isHostBlocked(String hostName) {
return blockedHosts.get().contains(hostName);
}
public static void blockHost(String hostName) {
blockedHosts.get().put(hostName, hostName);
}
public static void replaceBlockedHosts(List<String> blockList) {
ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
for (String hostName : blockList) {
newBlockedHosts.put(hostName, hostName);
}
blockedHosts.set(newBlockedHosts);
}
}
問題文:-
TimeoutExceptionすべてのサーバー (例として A、B、C、D) が起動している場合、上記のコードは正常に動作し、何も起きていませんhandle.getが、1 つのサーバー (A) がダウンしたとします。メインスレッドから呼び出しを行うと、多くの、つまりTimeoutException、膨大な数のクライアントタイムアウトが発生し始めます。
そして、なぜこれが起こっているのかわからないのですか?サーバーがダウンするとすぐにblockListに追加され、そのサーバーを呼び出すスレッドがなくなり、代わりにリスト内の別のサーバーが試行されるため、一般にこれは正しく行われませんか? したがって、プロセスはスムーズで、これらのサーバーが起動するとすぐに、blockList がバックグラウンド スレッドから更新され、呼び出しを開始できるようになります。
上記のコードに、この問題を引き起こす可能性のある問題はありますか? どんな提案でも大いに役立ちます。
一般に、私がやろうとしているのは、マッピング オブジェクトを使用して渡されるユーザー ID に応じてホスト名リストを作成することです。次に、最初のホスト名を呼び出して、応答を返します。ただし、そのホスト名がダウンしている場合は、ブロック リストに追加して、リストの 2 番目のホスト名を呼び出します。
ここに私が見ているスタックトレースがあります -
java.util.concurrent.TimeoutException\n\tat java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258)
java.util.concurrent.FutureTask.get(FutureTask.java:119)\n\tat com.host.client.DataClient.getDataSync(DataClient.java:20)\n\tat
注: 複数の userId の場合、同じサーバーを持つことができます。つまり、サーバー A は複数の userId に解決できます。