4

これが私の DataClientFactory クラスです。

public class DataClientFactory {
    public static IClient getInstance() {
        return ClientHolder.INSTANCE;
    }

    private static class ClientHolder {
        private static final DataClient INSTANCE = new DataClient();
        static {
            new DataScheduler().startScheduleTask();
        }
    }
}

これが私の DataClient クラスです。

public class DataClient implements IClient {

    private ExecutorService service = Executors.newFixedThreadPool(15);
    private RestTemplate restTemplate = new RestTemplate();

    // for initialization purpose
    public DataClient() {
        try {
            new DataScheduler().callDataService();
        } catch (Exception ex) { // swallow the exception
            // log exception
        }
    }

    @Override
    public DataResponse getDataSync(DataKey dataKeys) {
        DataResponse response = null;
        try {
            Future<DataResponse> handle = getDataAsync(dataKeys);
            response = handle.get(dataKeys.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException e) {
            // log error
            response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
        } catch (Exception e) {
            // log error
            response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);
        }

        return response;
    }

    @Override
    public Future<DataResponse> getDataAsync(DataKey dataKeys) {
        Future<DataResponse> future = null;
        try {
            DataTask dataTask = new DataTask(dataKeys, restTemplate);
            future = service.submit(dataTask);
        } catch (Exception ex) {
            // log error
        }

        return future;
    }
}

以下に示すように、上記のファクトリからクライアント インスタンスを取得し、オブジェクトgetDataSyncを渡してメソッドを呼び出しDataKeyます。DataKey オブジェクトにはuserIdと のTimeout値があります。この後、呼び出しが呼び出されるとすぐに、呼び出しが私のDataTaskクラスに行きます。callhandle.get

IClient dataClient = DataClientFactory.getInstance();

long userid = 1234l;
long timeout_ms = 500;

DataKey keys = new DataKey.Builder().setUserId(userid).setTimeout(timeout_ms)
            .remoteFlag(false).secondaryFlag(true).build();

// call getDataSync method
DataResponse dataResponse = dataClient.getDataSync(keys);
System.out.println(dataResponse);

ここにすべてのロジックを持つ私の DataTask クラスがあります -

public class DataTask implements Callable<DataResponse> {

    private DataKey dataKeys;
    private RestTemplate restTemplate;

    public DataTask(DataKey dataKeys, RestTemplate restTemplate) {
        this.restTemplate = restTemplate;
        this.dataKeys = dataKeys;
    }

    @Override
    public DataResponse call() {

        DataResponse dataResponse = null;
        ResponseEntity<String> response = null;

        int serialId = getSerialIdFromUserId();

        boolean remoteFlag = dataKeys.isRemoteFlag();
        boolean secondaryFlag = dataKeys.isSecondaryFlag();

        List<String> hostnames = new LinkedList<String>();

        Mappings mappings = ClientData.getMappings(dataKeys.whichFlow());

        String localPrimaryAdress = null;
        String remotePrimaryAdress = null;
        String localSecondaryAdress = null;
        String remoteSecondaryAdress = null;

        // use mappings object to get above Address by using serialId and basis on 
        // remoteFlag and secondaryFlag populate the hostnames linked list

        if (remoteFlag && secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
            hostnames.add(localSecondaryHostIPAdress);
            hostnames.add(remotePrimaryHostIPAdress);
            hostnames.add(remoteSecondaryHostIPAdress);
        } else if (remoteFlag && !secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
            hostnames.add(remotePrimaryHostIPAdress);
        } else if (!remoteFlag && !secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
        } else if (!remoteFlag && secondaryFlag) {
            hostnames.add(localPrimaryHostIPAdress);
            hostnames.add(localSecondaryHostIPAdress);
        }

        for (String hostname : hostnames) {
            // If host name is null or host name is in local block host list, skip sending request to this host
            if (hostname == null || ClientData.isHostBlocked(hostname)) {
                continue;
            }

            try {
                String url = generateURL(hostname);
                response = restTemplate.exchange(url, HttpMethod.GET, dataKeys.getEntity(), String.class);

                // make DataResponse

                break;

            } catch (HttpClientErrorException ex) {
                // make DataResponse
                return dataResponse;
            } catch (HttpServerErrorException ex) {
                // make DataResponse
                return dataResponse;
            } catch (RestClientException ex) {
                // If it comes here, then it means some of the servers are down.
                // Add this server to block host list 
                ClientData.blockHost(hostname);
                // log an error

            } catch (Exception ex) {
                // If it comes here, then it means some weird things has happened.
                // log an error
                // make DataResponse
            }
        }

        return dataResponse;
    }

    private String generateURL(final String hostIPAdress) {
        // make an url
    }


    private int getSerialIdFromUserId() {
        // get the id
    }
}

に基づいてuserId、 を取得してserialIdからホスト名のリストを取得します。渡されたフラグに応じて呼び出しを行うことを想定しています。次に、リストを繰り返し処理hostnamesし、サーバーを呼び出します。たとえば、リンクされたリストに 4 つのホスト名 (A、B、C、D) がある場合、最初に A を呼び出し、データが返されたら DataResponse を返します。しかし、A がダウンしている場合、他のスレッドが A のホスト名を呼び出せないように、すぐに A をブロック リストに追加する必要があるとします。次に、ホスト名 B に呼び出しを行い、データを取得して応答を返します (または、B もダウンしている場合は同じことを繰り返します)。

10 分ごとに実行されるバックグラウンド スレッドもあり、ファクトリからクライアント インスタンスを取得するとすぐに開始され、別のサービス URL を解析して、呼び出しを行うべきではないブロック ホスト名のリストを取得します。10分ごとに実行されるため、ダウンしているサーバーは10分後にのみリストを取得します。一般に、Aがダウンしている場合、私のサービスはホスト名のブロックリストとしてAを提供し、Aがアップするとすぐに提供します、そのリストも 10 分後に更新されます。

これが私のバックグラウンド スレッド コード DataScheduler です。

public class DataScheduler {

    private RestTemplate restTemplate = new RestTemplate();
    private static final Gson gson = new Gson();

    private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);

    public void startScheduleTask() {
        scheduler.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    callDataService();
                } catch (Exception ex) {
                    // log an error
                }
            }
        }, 0, 10L, TimeUnit.MINUTES);
    }

    public void callDataService() throws Exception {
        String url = null;

        // execute the url and get the responseMap from it as a string

        parseResponse(responseMap);
    }


    private void parseResponse(Map<FlowsEnum, String> responses) throws Exception {

        // .. some code here to calculate partitionMappings

        // block list of hostnames 
        Map<String, List<String>> coloExceptionList = gson.fromJson(response.split("blocklist=")[1], Map.class);
        for (Map.Entry<String, List<String>> entry : coloExceptionList.entrySet()) {
            for (String hosts : entry.getValue()) {
                blockList.add(hosts);
            }
        }

        if (update) {
            ClientData.setAllMappings(partitionMappings);
        }

        // update the block list of hostnames
        if (!DataUtils.isEmpty(responses)) {
            ClientData.replaceBlockedHosts(blockList);
        }
    }
}

そして、ホスト名のブロック リストと partitionMappings の詳細 (有効なホスト名のリストを取得するために使用される) のすべての情報を保持する ClientData クラスを次に示します。

public class ClientData {

    private static final AtomicReference<ConcurrentHashMap<String, String>> blockedHosts = new AtomicReference<ConcurrentHashMap<String, String>>(
            new ConcurrentHashMap<String, String>());


    // some code here to set the partitionMappings by using CountDownLatch 
    // so that read is blocked for first time reads

    public static boolean isHostBlocked(String hostName) {
        return blockedHosts.get().contains(hostName);
    }

    public static void blockHost(String hostName) {
        blockedHosts.get().put(hostName, hostName);
    }

    public static void replaceBlockedHosts(List<String> blockList) {
        ConcurrentHashMap<String, String> newBlockedHosts = new ConcurrentHashMap<>();
        for (String hostName : blockList) {
            newBlockedHosts.put(hostName, hostName);
        }
        blockedHosts.set(newBlockedHosts);
    }
}

問題文:-

TimeoutExceptionすべてのサーバー (例として A、B、C、D) が起動している場合、上記のコードは正常に動作し、何も起きていませんhandle.getが、1 つのサーバー (A) がダウンしたとします。メインスレッドから呼び出しを行うと、多くの、つまりTimeoutException、膨大な数のクライアントタイムアウトが発生し始めます。

そして、なぜこれが起こっているのかわからないのですか?サーバーがダウンするとすぐにblockListに追加され、そのサーバーを呼び出すスレッドがなくなり、代わりにリスト内の別のサーバーが試行されるため、一般にこれは正しく行われませんか? したがって、プロセスはスムーズで、これらのサーバーが起動するとすぐに、blockList がバックグラウンド スレッドから更新され、呼び出しを開始できるようになります。

上記のコードに、この問題を引き起こす可能性のある問題はありますか? どんな提案でも大いに役立ちます。

一般に、私がやろうとしているのは、マッピング オブジェクトを使用して渡されるユーザー ID に応じてホスト名リストを作成することです。次に、最初のホスト名を呼び出して、応答を返します。ただし、そのホスト名がダウンしている場合は、ブロック リストに追加して、リストの 2 番目のホスト名を呼び出します。

ここに私が見ているスタックトレースがあります -

java.util.concurrent.TimeoutException\n\tat java.util.concurrent.FutureTask$Sync.innerGet(FutureTask.java:258)
java.util.concurrent.FutureTask.get(FutureTask.java:119)\n\tat com.host.client.DataClient.getDataSync(DataClient.java:20)\n\tat 

注: 複数の userId の場合、同じサーバーを持つことができます。つまり、サーバー A は複数の userId に解決できます。

4

2 に答える 2

0

DataClient クラスの以下の行:

public class DataClient implements IClient {

----code code---

        Future<DataResponse> handle = getDataAsync(dataKeys);

//BELOW LINE IS PROBLEM

        response = handle.get(dataKeys.getTimeout(), TimeUnit.MILLISECONDS); // <--- HERE
    } catch (TimeoutException e) {
        // log error
        response = new DataResponse(null, DataErrorEnum.CLIENT_TIMEOUT, DataStatusEnum.ERROR);
    } catch (Exception e) {
        // log error
        response = new DataResponse(null, DataErrorEnum.ERROR_CLIENT, DataStatusEnum.ERROR);

----code code-----

handle.get(...) にタイムアウトを割り当てました。これは、REST 接続が応答する前にタイムアウトになります。残りの接続自体はタイムアウトする場合としない場合がありますが、スレッドの実行が完了する前に将来の get メソッドをタイムアウトしているため、ホストのブロックには目に見える効果はありませんが、call メソッド内のコードはDataTask が期待どおりに動作している可能性があります。お役に立てれば。

于 2014-09-04T07:17:06.893 に答える
0

提案について尋ねられたので、ここにいくつかの提案を示します

。 1.)予期しない戻り値
メソッドが予期せずFALSEを返す

if (ClientData.isHostBlocked(hostname)) //this may return always false! please check

2.)例外処理RestClientException
が発生する ことを本当に確信していますか? この例外が発生した場合にのみ、ホストはブロック リストに追加されます。 投稿されたコードはログを無視しているようです (コメントアウトされています!)

        ...catch (HttpClientErrorException ex) {
            // make DataResponse
            return dataResponse;
        } catch (HttpServerErrorException ex) {
            // make DataResponse
            return dataResponse;
        } catch (RestClientException ex) {
            // If it comes here, then it means some of the servers are down.
            // Add this server to block host list 
            ClientData.blockHost(hostname);
            // log an error

        } catch (Exception ex) {
            // If it comes here, then it means some weird things has happened.
            // log an error
            // make DataResponse
        }
于 2014-09-06T20:25:23.407 に答える