0
void AFCQueue::ExtractValuesSecComplex(int startIndex, int endIndex,int helperIndex)
{

int size = 0,i,index;
TimeType min_timestamp;
bool is_singleQueue = false;
TimeType* local_queue_time = helper_queue_time[helperIndex];
int* local_queue_value = helper_queue_value[helperIndex];
volatile int& in_local_helper = in_sec[helperIndex];
volatile int& out_local_helper = out_sec[helperIndex];

NodeArrayBlock * heads_local[_MAX_THREADS];
NodeArrayBlock *tails_local[_MAX_THREADS];
int outs_local[_MAX_THREADS];
int ins_local[_MAX_THREADS];
TimeType local_timearray[_MAX_THREADS];
int min_index = 0;

min_timestamp = timestamps_arr[startIndex];
for (i=startIndex,index=startIndex ; i < endIndex; i++){
    heads_local[i] = (NodeArrayBlock *)heads[i];
    outs_local[i]  = outs[i];
    tails_local[i] = (NodeArrayBlock *)tails[i];
    ins_local[i]   = ins[i];
    local_timearray[i] = timestamps_arr[i];

    if (local_timearray[i] < min_timestamp){
        min_timestamp = local_timearray[i];
        index = i;
    }
}

do{
    //if central queue is full 
    while((out_local_helper-1)==in_local_helper || 
        (out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) || _gIsStopThreads){
        if (_gIsStopThreads)
            return;
    }

    local_queue_time[in_local_helper] = heads_local[index]->_timestamp_arr[outs_local[index]];
    local_queue_value[in_local_helper] = heads_local[index]->_values_arr[outs_local[index]++];

    if (in_local_helper < HELPERS_QUEUE_SIZE_1)
        in_local_helper++;
    else
        in_local_helper = 0;    

    if (outs_local[index] == _INIT_SIZE){
        heads_local[index]->_free = true;
        heads_local[index] = heads_local[index]->_next;
        if (heads_local[index]==null)
        {
            tails_local[index] = null;
            ins_local[index]=0;
        }
        outs_local[index] = 0;
    }
    if (ins_local[index] == outs_local[index] &&
        heads_local[index]==tails_local[index])
    {
        //if it was not the last local queue in the array of snapshots
        if (--endIndex != index){
            heads_local[index]                  = heads_local[endIndex];
            tails_local[index]                  = tails_local[endIndex];
            outs_local[index]                   = outs_local[endIndex];
            ins_local[index]                    = ins_local[endIndex];
            local_timearray[index]              = local_timearray[endIndex];
        }
        if ((endIndex-startIndex)==1)
            is_singleQueue = true;
        heads_local[endIndex]                   = null;
    }else{
        local_timearray[index] = heads_local[index]->_timestamp_arr[outs_local[index]];
    }
    //If a single Queue left, no need to check timestamps
    if (is_singleQueue){
        int out = outs_local[startIndex];
        int in  = ins_local[startIndex];
        NodeArrayBlock* he = heads_local[startIndex];
        NodeArrayBlock* ta = tails_local[startIndex];
        int* value_arr = he->_values_arr;
        TimeType* time_arr = he->_timestamp_arr;
        while (true){
            if ((in == out && he==ta))
            {
                //heads[startIndex] = null;
                return;
            }
            if (out == _INIT_SIZE){
                he->_free = true;
                he = he->_next;
                if (he==null)
                {
                    //heads[startIndex]=null;
                    return;
                }
                value_arr = he->_values_arr;
                time_arr = he->_timestamp_arr;
                out = 0;
            }   
            while((out_local_helper-1)==in_local_helper || 
                (out_local_helper==0 && in_local_helper == HELPERS_QUEUE_SIZE_1) || 
                _gIsStopThreads){
                if (_gIsStopThreads)
                    return;
            }

            if (he==ta){
                if (out_local_helper <= in_local_helper){
                    min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,in-out);
                }else{
                    min_index = Math::Min(out_local_helper-1-in_local_helper,in-out);
                }
            }else{
                if (out_local_helper <= in_local_helper){
                    min_index = Math::Min(HELPERS_QUEUE_SIZE-in_local_helper,_INIT_SIZE-out);
                }else{
                    min_index = Math::Min(out_local_helper-1-in_local_helper,_INIT_SIZE-out);
                }
            }
            memcpy(&local_queue_time[in_local_helper],&time_arr[out],min_index * sizeof(*time_arr));
            memcpy(&local_queue_value[in_local_helper],&value_arr[out],min_index * sizeof(*value_arr));
            in_local_helper+=min_index;
            out+=min_index;
            if (in_local_helper == HELPERS_QUEUE_SIZE)
                in_local_helper = 0;
        }
    }
    if (endIndex==startIndex)
        break;

    min_timestamp = local_timearray[startIndex];
    for(i = startIndex+1,index=startIndex; i < endIndex ;i++){
        if (local_timearray[i] < min_timestamp){
            min_timestamp = local_timearray[i];
            index = i;
        }
    }
}while(true);
}

これは私のアルゴリズムのスニペットです。この関数は、多数のキューを反復処理する単一のスレッド専用です (タイムスタンプを持つキューと、値を持つキューがあります)。

このメソッドを実行する各スレッドは、X 個のキューを反復し、それらをタイムスタンプと値の単一の循環キューにマージします。

この関数はキャッシュミスが多く、

キャッシュミスを減らすためにどのように改善できますか (複数のスレッドが異なる ID - helperIndex でこのメソッドを同時に実行します)

4

2 に答える 2

1

これは実際にはコードの問題ではないため、コードの投稿は効果的ではありませんでした。

キャッシュミスを制限するには、DATAを変更して、各スレッドが一度に独自の[L1キャッシュサイズ]チャンクでのみ機能するようにします。これは、マージソートを使用するとかなり簡単です。

典型的な効果的なマージソートは、スレッドとマージタスクのプールを使用して、入力パーティションを分割し、タスクが[L1キャッシュサイズ]よりも小さいパーティションを取得するまで子マージを生成し、インプレースソートを使用します。クイックソートのように、最後のビットを終了します。

分割は、タスクが挿入ソートを実行するときにデータを移動する1つの追加の[データサイズ]バッファーを使用して実行できます。クイックソートが実行された後です。memcopyingの必要はないはずです。

データを考慮せずに、シングルスレッドのインラインコードを取得して複数のスレッドで機能させるだけでは効果がありません。

于 2012-06-27T09:03:40.653 に答える
1

もちろん、多くのキャッシュミスがあります。覚えておく必要があるのは、CPU と L3 キャッシュと RAM の間にバスが 1 つしかないということです。そのため、スレッド 1 がメモリの読み取りでビジー状態になり、別のスレッドが別のメモリを使用して同じことを実行できるようにするために中断された場合、キャッシュをリロードする必要があります。これは、プロセスが中断されたときにも発生します。プロセスが実行を再開すると、キャッシュをリロードする必要があります。

キャッシュ ミスを制限するには、スレッドの数を物理コアの数に制限します (ハイパー スレッド コアは無視します)。コアよりも多くのスレッドがある場合、スレッドの切り替えがあるたびにキャッシュを更新する必要があります。スレッドとコアの数が同じであれば、キャッシュが別のスレッド/プロセスに失われる可能性が低くなります。num_cores - 1 スレッドを試して、それが役立つかどうかを確認します。

また、コードは非常に大きく、コメント化されていません。正確に何をしているのかわかりにくい。

于 2012-06-27T08:46:04.040 に答える