CVA(信用評価調整)を計算するためにActivePivotインスタンスを検討します。
多数のセル(各カウンターパーティに対して20k)にロジックを適用する必要があり、それぞれがサイズ10kのfloat配列に関連付けられています。ActivePivotが大規模なマルチスレッド化されている場合でも、ABasicPostProcessorは各範囲の場所にモノスレッド方式で適用されます。マルチスレッドの方法でポイントの場所を計算するにはどうすればよいですか?
CVA(信用評価調整)を計算するためにActivePivotインスタンスを検討します。
多数のセル(各カウンターパーティに対して20k)にロジックを適用する必要があり、それぞれがサイズ10kのfloat配列に関連付けられています。ActivePivotが大規模なマルチスレッド化されている場合でも、ABasicPostProcessorは各範囲の場所にモノスレッド方式で適用されます。マルチスレッドの方法でポイントの場所を計算するにはどうすればよいですか?
マルチスレッドでdoEvaluationの呼び出しを追加するだけで、ABasicPostProcessor(ポイントごとのポストプロセッサーの高速実装を可能にするコアクラス)に特化した次のクラスを作成しました。
ABasicPostProcessorの特殊化を考えると、並列評価を取得するには、AParallelBasicPostProcessorを拡張する必要があります。
/**
* Specialization of ABasicPostProcessor which will call doEvaluation in a
* multithreaded way
*
* @author BLA
*/
public abstract class AParallelBasicPostProcessor<OutputType> extends ABasicPostProcessor<OutputType> {
private static final long serialVersionUID = -3453966549173516186L;
public AParallelBasicPostProcessor(String name, IActivePivot pivot) {
super(name, pivot);
}
@Override
public void evaluate(ILocation location, final IAggregatesRetriever retriever) throws QuartetException {
// Retrieve required aggregates
final ICellSet cellSet = retriever.retrieveAggregates(Collections.singleton(location), Arrays.asList(prefetchMeasures));
// Prepare a List
List<ALocatedRecursiveTask<OutputType>> tasks = new ArrayList<ALocatedRecursiveTask<OutputType>>();
// Create the procedure to hold the parallel sub-tasks
final ICellsProcedure subTasksGeneration = makeSubTasksGenerationProcedure(tasks);
cellSet.forEachLocation(subTasksGeneration, underlyingMeasures);
ForkJoinTask.invokeAll(tasks);
for (ALocatedRecursiveTask<OutputType> task : tasks) {
OutputType returnValue;
try {
returnValue = task.get();
} catch (InterruptedException e) {
throw new RuntimeException(e);
} catch (ExecutionException e) {
// re-throw the root cause of the ExecutionException
throw new RuntimeException(e.getCause());
}
// We can write only non-null aggregates
if (null != returnValue) {
writeInRetriever(retriever, task.getLocation(), returnValue);
}
}
}
protected void writeInRetriever(IAggregatesRetriever retriever, ILocation location, OutputType returnValue) {
retriever.write(location, returnValue);
}
protected ICellsProcedure makeSubTasksGenerationProcedure(List<ALocatedRecursiveTask<OutputType>> futures) {
return new SubTasksGenerationProcedure(futures);
}
/**
* {@link ICellsProcedure} registering a {@link ALocatedRecursiveTask} per
* point location
*/
protected class SubTasksGenerationProcedure implements ICellsProcedure {
protected List<ALocatedRecursiveTask<OutputType>> futures;
public SubTasksGenerationProcedure(List<ALocatedRecursiveTask<OutputType>> futures) {
this.futures = futures;
}
@Override
public boolean execute(final ILocation pointLocation, int rowId, Object[] measures) {
// clone the array of measures as it is internally used as a buffer
final Object[] clone = measures.clone();
futures.add(makeLocatedFuture(pointLocation, clone));
return true;
}
}
protected ALocatedRecursiveTask<OutputType> makeLocatedFuture(ILocation pointLocation, Object[] measures) {
return new LocatedRecursiveTask(pointLocation, measures);
}
/**
* A specialization of RecursiveTask by associating it to a
* {@link ILocation}
*
* @author BLA
*
*/
protected static abstract class ALocatedRecursiveTask<T> extends RecursiveTask<T> {
private static final long serialVersionUID = -6014943980790547011L;
public abstract ILocation getLocation();
}
/**
* Default implementation of {@link ALocatedRecursiveTask}
*
* @author BLA
*
*/
protected class LocatedRecursiveTask extends ALocatedRecursiveTask<OutputType> {
private static final long serialVersionUID = 676859831679236794L;
protected ILocation pointLocation;
protected Object[] measures;
public LocatedRecursiveTask(ILocation pointLocation, Object[] measures) {
this.pointLocation = pointLocation;
this.measures = measures;
if (pointLocation.isRange()) {
throw new RuntimeException(this.getClass() + " accepts only point location: " + pointLocation);
}
}
@Override
protected OutputType compute() {
try {
// The custom evaluation will be computed in parallel
return AParallelBasicPostProcessor.this.doEvaluation(pointLocation, measures);
} catch (QuartetException e) {
throw new RuntimeException(e);
}
}
@Override
public ILocation getLocation() {
return pointLocation;
}
}
}
ActivePivotクエリエンジンは高度にマルチスレッド化されており、単一のクエリ内での複数のポストプロセッサの呼び出しは並行して実行されます(もちろん、1つが別の結果に依存しない場合)。同じポストプロセッサがクエリに関係する場所で複数回実行されると、それも並行して実行されます。したがって、袖をまくり上げる前に、クエリプランにもっと明白なボトルネックがないかどうかを確認する価値があります。
現在、1つの場所での1つのポストプロセッサの呼び出しは、ActivePivotクエリエンジンでは分割できないワークロードです。また、集計がナノ秒単位で合計される数値だけでなく、ベクトルなどの大きなオブジェクトや構造化されたオブジェクトの場合、並列処理によってパフォーマンスを向上させる余地がある可能性があります。
ActivePivotクエリエンジンは、fork / joinプール(http://docs.oracle.com/javase/tutorial/essential/concurrency/forkjoin.html)の上に構築されています。つまり、ポストプロセッサコードは常にフォーク参加プール内から呼び出され、独自のサブタスクをフォークしてからそれらに参加することができます。これは専門家のトリックと見なされます。フォーク結合プールがどのように機能するかを十分に理解せずに試してはいけません。
評価された場所ごとに、いくつかのメジャーの最大値を計算するポストプロセッサについて考えてみましょう。
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.postprocessing.impl.ABasicPostProcessor;
import com.quartetfs.fwk.QuartetException;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
/**
*
* Post processor that computes the MAX of several measures.
*
* @author Quartet FS
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.postprocessing.IPostProcessor", key = MaxPostProcessor.TYPE)
public class MaxPostProcessor extends ABasicPostProcessor<Double> {
/** serialVersionUID */
private static final long serialVersionUID = -8886545079342151420L;
/** Plugin type */
public static final String TYPE = "MAX";
public MaxPostProcessor(String name, IActivePivot pivot) {
super(name, pivot);
}
@Override
public String getType() { return TYPE; }
@Override
protected Double doEvaluation(ILocation location, Object[] measures) throws QuartetException {
double max = ((Number) measures[0]).doubleValue();
for(int i = 1; i < measures.length; i++) {
max = Math.max(max, ((Number) measures[i]).doubleValue());
}
return max;
}
}
そのポストプロセッサでは、評価された範囲の場所から生じるリーフの場所が次々に計算されます。代わりにタスクを作成し、フォーク結合プールを介してそれらのタスクを並行して実行することを決定できます。以下があなたを始めることを願っています:
package com.quartetfs.pivot.sandbox.postprocessor.impl;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import jsr166y.ForkJoinTask;
import jsr166y.RecursiveTask;
import com.quartetfs.biz.pivot.IActivePivot;
import com.quartetfs.biz.pivot.ILocation;
import com.quartetfs.biz.pivot.cellset.ICellSet;
import com.quartetfs.biz.pivot.cellset.ICellsProcedure;
import com.quartetfs.biz.pivot.query.aggregates.IAggregatesRetriever;
import com.quartetfs.fwk.QuartetException;
import com.quartetfs.fwk.QuartetExtendedPluginValue;
/**
*
* Post processor that computes the MAX of several measures,
* evaluation of locations is performed in parallel.
*
* @author Quartet FS
*
*/
@QuartetExtendedPluginValue(interfaceName = "com.quartetfs.biz.pivot.postprocessing.IPostProcessor", key = ParallelMaxPostProcessor.TYPE)
public class ParallelMaxPostProcessor extends MaxPostProcessor {
/** serialVersionUID */
private static final long serialVersionUID = -8886545079342151420L;
/** Plugin type */
public static final String TYPE = "PMAX";
public ParallelMaxPostProcessor(String name, IActivePivot pivot) {
super(name, pivot);
}
@Override
public String getType() { return TYPE; }
@Override
public void evaluate(ILocation location, IAggregatesRetriever retriever)throws QuartetException {
try {
// Retrieve required aggregates
ICellSet cellSet = retriever.retrieveAggregates(Collections.singleton(location), Arrays.asList(prefetchMeasures));
// Evaluate the cell set to create tasks
ParallelEvaluationProcedure evalProcedure = new ParallelEvaluationProcedure();
cellSet.forEachLocation(evalProcedure);
// Execute the tasks in parallel and write results
evalProcedure.writeResults(retriever);
} catch(Exception e) {
throw new QuartetException("Evaluation of " + this + " on location " + location + " failed.", e);
}
}
/**
* Procedure evaluated on the cell set.
*/
protected class ParallelEvaluationProcedure implements ICellsProcedure {
/** List of tasks */
protected final List<MaxComputation> tasks = new ArrayList<ParallelMaxPostProcessor.MaxComputation>();
@Override
public boolean execute(ILocation location, int rowId, Object[] measures) {
Object[] numbers = measures.clone();
tasks.add(new MaxComputation(location, numbers));
return true; // continue
}
/** Once all the tasks are executed, write results */
public void writeResults(IAggregatesRetriever retriever) throws Exception {
// Invoke all the tasks in parallel
// using the fork join pool that runs the post processor.
ForkJoinTask.invokeAll(tasks);
for(MaxComputation task : tasks) {
retriever.write(task.location, task.get());
}
}
}
/**
* Max computation task. It illustrates our example well
* but in real-life this would be too little
* of a workload to deserve parallel execution.
*/
protected class MaxComputation extends RecursiveTask<Double> {
/** serialVersionUID */
private static final long serialVersionUID = -5843737025175189495L;
final ILocation location;
final Object[] numbers;
public MaxComputation(ILocation location, Object[] numbers) {
this.location = location;
this.numbers = numbers;
}
@Override
protected Double compute() {
try {
return doEvaluation(location, numbers);
} catch (QuartetException e) {
completeExceptionally(e);
return null;
}
}
}
}