1

説明するのは少し複雑ですが、ここまでです。基本的には、「いかに効率的に問題を部分問題に分解するか」が課題です。ここでの「効率的」とは、分割された部分問題が可能な限り大きいことを意味します。基本的に、問題を分解する必要がまったくなければ理想的です。ただし、ワーカーは問題の特定のブロックにしか取り組むことができないため、私は解散する必要があります。しかし、これをできるだけ粗くす​​る方法を見つけたいと思っています。

ここにいくつかの擬似コードがあります..

このような問題があります (Java で申し訳ありません。わからない場合は、喜んで説明します)。

class Problem {
    final Set<Integer> allSectionIds = { 1,2,4,6,7,8,10 };
    final Data data = //Some data
}

そして、副問題は次のとおりです。

class SubProblem {
    final Set<Integer> targetedSectionIds;
    final Data data;

    SubProblem(Set<Integer> targetedSectionsIds, Data data){
        this.targetedSectionIds = targetedSectionIds;
        this.data = data;
    }
}

すると、作業はこのようになります。

class Work implements Runnable {
    final Set<Section> subSections;
    final Data data;
    final Result result;

    Work(Set<Section> subSections, Data data) {
        this.sections = SubSections;
        this.data = data;
    }

    @Override
    public void run(){
        for(Section section : subSections){
            result.addUp(compute(data, section));
        }
    }
}

これで、独自の state を持つ「Worker」のインスタンスができましたsections I have

class Worker implements ExecutorService {
    final Map<Integer,Section> sectionsIHave;
    {
        sectionsIHave = {1:section1, 5:section5, 8:section8 };
    }

    final ExecutorService executor = //some executor.

    @Override
    public void execute(SubProblem problem){
        Set<Section> sectionsNeeded = fetchSections(problem.targetedSectionIds);
        super.execute(new Work(sectionsNeeded, problem.data);
    }

}

ふぅ。

そのため、私たちは多くの を持っており、より多くのProblemWorkers常に求めていますSubProblems。私の仕事は、彼らに別れProblemsSubProblem告げることです。ただし、難しいのは、後で SubProblems のすべての結果を収集し、それらをResult全体の にマージ (削減) する必要があることProblemです。

ただし、これにはコストがかかるため、ワーカーにできるだけ大きな「チャンク」を提供したいと考えています (できるだけ多くのチャンクを持ってtargetedSectionsいます)。

完璧である必要はありません (数学的に可能な限り効率的か何か)。つまり、各計算にかかる時間を予測できないなどの理由で、完全な解決策を見つけることは不可能だと思います。しかし、これに対する優れたヒューリスティックな解決策はありますか? それとも、設計に入る前に読むことができるリソースはありますか?

どんなアドバイスでも大歓迎です!

編集:セクション割り当ても制御できるため、それを制御することは別のオプションです。基本的に、これに関する唯一の制限は、ワーカーが固定数のセクションしか持つことができないということです。

4

1 に答える 1

1

ネットワーク サービスのシャーディング モデルがあるようです。同様のことを行い、特定のネットワーク サービスに接続する「クライアント」(ワーカー) に対して「entityId」(sectionId) の逆インデックスを使用します。その特定のエンティティを扱います。最も簡単な方法 (以下を参照) は、id から worker への逆マップを使用することです。メモリが制約である場合、関数を使用する別の可能性があります (例: sectionId % サービス数)。

サービスにできるだけ多くの作業を与えるために、ユーザーが指定した最大数までバッチ処理を行う単純なバッチ処理アルゴリズムがあります。これにより、リモート サービスがそれらを消費できる速度に応じて、作業のチャンクを大まかにサイズ設定できます。

public class Worker implements Runnable {

    private final Map<Integer, Section> sections;
    private final BlockingQueue<SubProblem> problemQ = new ArrayBlockingQueue<SubProblem>(4096);
    private final int batchSize;

    public Worker(final Map<Integer, Section> sectionsIHave, final int batchSize) {
        this.sections = sectionsIHave;
        this.batchSize = batchSize;
    }

    public Set<Integer> getSectionIds() {
        return sections.keySet();
    }

    public void execute(final SubProblem command) throws InterruptedException {

        if (sections.containsKey(command.getSectionId())) {
            problemQ.put(command);
        } else {
            throw new IllegalArgumentException("Invalid section id for worker: " + command.getSectionId());
        }

    }

    @Override
    public void run() {
        final List<SubProblem> batch = new ArrayList<SubProblem>(batchSize);
        while (!Thread.interrupted()) {
            batch.clear();

            try {
                batch.add(problemQ.take());
                for (int i = 1; i < batchSize; i++) {
                    final SubProblem problem = problemQ.poll();
                    if (problem != null) {
                        batch.add(problem);
                    } else {
                        break;
                    }

                    process(batch);
                }
            } catch (final InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void process(final List<SubProblem> batch) {
        // Submit to remote process.
    }

    private static Map<Integer, Worker> indexWorkers(final List<Worker> workers) {
        final Map<Integer, Worker> temp = new HashMap<Integer, Worker>();
        for (final Worker worker : workers) {
            for (final Integer sectionId : worker.getSectionIds()) {
                temp.put(sectionId, worker);
            }
        }
        return Collections.unmodifiableMap(temp);
    }

    public static void main(final String[] args) throws InterruptedException {
     // Load workers, where worker is bound to single remote service
        final List<Worker> workers = getWorkers();
        final Map<Integer, Worker> workerReverseIndex = indexWorkers(workers);
        final List<SubProblem> subProblems = getSubProblems();
        for (final SubProblem problem : subProblems) {
            final Worker w = workerReverseIndex.get(problem.getSectionId());
            w.execute(problem);
        }
    }
}
于 2010-03-28T13:41:37.877 に答える