O(log n) クエリと O(n log n) 前処理/スペースは、次のプロパティを持つ多数区間を見つけて使用することで実現できます。
- 入力配列の各値に対して、1 つまたは複数の多数区間が存在する可能性があります (または、これらの値を持つ要素がまばらすぎる場合は存在しない可能性があります。長さ 1 の多数区間は必要ありません。長さ 1 の多数区間は、サイズのクエリ間隔にのみ役立つ可能性があるためです) 1 は、特別なケースとしてより適切に処理されます)。
- クエリ間隔がこれらの多数間隔の 1 つに完全に収まっている場合、対応する値 はこのクエリ間隔の多数要素である可能性があります。
- クエリ間隔を完全に含む多数間隔が存在しない場合、対応する値 は、このクエリ間隔の多数要素にはなりません。
- 入力配列の各要素は、O(log n)多数区間でカバーされます。
言い換えれば、多数区間の唯一の目的は、任意のクエリ インターバルに対して O(log n) 個の多数要素候補を提供することです。
このアルゴリズムは、次のデータ構造を使用します。
- 入力配列の各値の位置のリスト(
map<Value, vector<Position>>
)。代わりunordered_map
に、パフォーマンスを向上させるためにここで使用することもできます (ただし、構造 #3 が適切な順序で満たされるように、すべてのキーを抽出して並べ替える必要があります)。
- 各値の大多数区間のリスト( )。
vector<Interval>
- クエリを処理するためのデータ構造 (
vector<small_map<Value, Data>>
)。whereには、指定されたvalueData
を持つ要素の次/前の位置を指す構造 #1 からの適切なベクトルの 2 つのインデックスが含まれます。更新: @justhalf のおかげで、指定された値を持つ要素の累積頻度を格納する方が適切です。ペアの並べ替えられたベクトルとして実装できます。前処理により、既に並べ替えられた順序で要素が追加され、クエリは線形検索にのみ使用されます。Data
small_map
small_map
前処理:
- 入力配列をスキャンし、現在の位置を構造体 #1 の適切なベクトルにプッシュします。
- 構造体 #1 のすべてのベクトルに対して、手順 3 .. 4 を実行します。
- 位置のリストを多数区間のリストに変換します。以下の詳細を参照してください。
- 多数区間の 1 つに含まれる入力配列のインデックスごとに、データを構造 #3 の適切な要素に挿入します:値と、この値を持つ前/次の要素の位置 (またはこの値の累積頻度)。
クエリ:
- クエリ間隔の長さが 1 の場合、ソース配列の対応する要素を返します。
- クエリ間隔の開始点については、3 番目の構造体のベクトルの対応する要素を取得します。マップの各要素に対して、ステップ 3 を実行します。クエリ間隔の終了点に対応するマップのすべての要素をこのマップと並行してスキャンして、ステップ 3 で O(1) の複雑さを許可します (O(log log n) ではなく)。
- クエリ間隔の終点に対応するマップに一致する値が含まれている場合、 を計算し
s3[stop][value].prev - s3[start][value].next + 1
ます。クエリ間隔の半分より大きい場合は、valueを返します。次/前のインデックスの代わりに累積度数を使用する場合は、s3[stop+1][value].freq - s3[start][value].freq
代わりに計算します。
- ステップ 3 で何も見つからなかった場合は、"Nothing" を返します。
アルゴリズムの主要部分は、位置のリストから多数決区間を取得することです。
- リスト内の各位置に重みを割り当てます:
number_of_matching_values_to_the_left - number_of_nonmatching_values_to_the_left
.
- 重みのみを厳密に降順に (貪欲に)フィルター処理して、"プレフィックス" 配列に入れます
for (auto x: positions) if (x < prefix.back()) prefix.push_back(x);
。
- 厳密に昇順 (貪欲に、逆方向) で重みのみをフィルター処理して、"接尾辞" 配列: にし
reverse(positions); for (auto x: positions) if (x > suffix.back()) suffix.push_back(x);
ます。
- 「プレフィックス」配列と「サフィックス」配列を一緒にスキャンし、すべての「プレフィックス」要素から「サフィックス」配列の対応する場所まで、およびすべての「サフィックス」要素から「プレフィックス」配列の対応する場所までの間隔を見つけます。(すべての「接尾辞」要素の重みが指定された「接頭辞」要素よりも小さい場合、またはそれらの位置がその右側にない場合、インターバルは生成されません。指定された「接頭辞」要素とまったく同じ重みを持つ「接尾辞」要素がない場合、より大きな重みを持つ最も近い「接尾辞」要素を取得し、この重み差で間隔を右側に拡張します)。
- 重複する間隔をマージします。
多数区間のプロパティ 1 .. 3 は、このアルゴリズムによって保証されます。プロパティ #4 に関しては、多数区間の最大数でいくつかの要素をカバーすることが想像できる唯一の方法は、次のようなものです11111111222233455666677777777
。ここでは要素4
が間隔で覆われて2 * log n
いるため、このプロパティは満たされているようです。この記事の最後で、このプロパティのより正式な証明を参照してください。
例:
入力配列 "0 1 2 0 0 1 1 0" の場合、次の位置リストが生成されます。
value positions
0 0 3 4 7
1 1 5 6
2 2
value の位置は0
、次のプロパティを取得します。
weights: 0:1 3:0 4:1 7:0
prefix: 0:1 3:0 (strictly decreasing)
suffix: 4:1 7:0 (strictly increasing when scanning backwards)
intervals: 0->4 3->7 4->0 7->3
merged intervals: 0-7
value の位置は1
、次のプロパティを取得します。
weights: 1:0 5:-2 6:-1
prefix: 1:0 5:-2
suffix: 1:0 6:-1
intervals: 1->none 5->6+1 6->5-1 1->none
merged intervals: 4-7
クエリ データ構造:
positions value next prev
0 0 0 x
1..2 0 1 0
3 0 1 1
4 0 2 2
4 1 1 x
5 0 3 2
...
クエリ [0,4]:
prev[4][0]-next[0][0]+1=2-0+1=3
query size=5
3>2.5, returned result 0
クエリ [2,5]:
prev[5][0]-next[2][0]+1=2-1+1=2
query size=4
2=2, returned result "none"
要素 "1" の検査は試行されないことに注意してください。これは、多数区間にこれらの区間のいずれも含まれていないためです。
プロパティ #4 の証明:
大多数区間は、すべての要素の厳密に 1/3 以上が対応する値を持つように構築されます。any*(m-1) value*m any*m
この比率は、たとえば のようなサブアレイでは 1/3 に最も近くなります01234444456789
。
この証明をより明確にするために、各間隔を 2D の点として表すことができます。つまり、可能なすべての開始点を横軸で表し、すべての可能な終了点を縦軸で表します (下の図を参照)。
すべての有効な区間は、対角線上または上にあります。白い四角形は、一部の配列要素をカバーするすべての間隔を表します (右下隅に単位サイズの間隔として表されます)。
この白い長方形を、同じ右下隅を共有するサイズ 1、2、4、8、16、... の正方形で覆いましょう。これは、白い領域を黄色の領域と同様の O(log n) 領域 (および、このアルゴリズムでは無視されるサイズ 1 の単一間隔を含むサイズ 1 の単一正方形) に分割します。
黄色の領域に配置できる過半数区間の数を数えましょう。1 つの間隔 (対角隅に最も近い位置) は、対角隅から最も遠い間隔に属する要素の 1/4 を占めます (この最大の間隔には、黄色の領域の任意の間隔に属するすべての要素が含まれます)。これは、最小間隔には、黄色の領域全体で使用可能な値の1/12 を厳密に超える値が含まれることを意味します。したがって、黄色の領域に 12 個の間隔を配置しようとすると、さまざまな値に対して十分な要素がありません。したがって、黄色の領域には 11 以上の過半数区間を含めることはできません。11 * log n
また、白い四角形には多数区間よりも多くの区間を含めることはできません。証明完了。
11 * log n
過大評価です。先に述べたように、いくつかの要素をカバーする2 * log n
過半数以上の区間を想像するのは困難です。そして、この値でさえ、多数区間をカバーする平均数よりもはるかに大きくなっています。
C++11 実装。ideoneまたはここで参照してください。
#include <iostream>
#include <vector>
#include <map>
#include <algorithm>
#include <functional>
#include <random>
constexpr int SrcSize = 1000000;
constexpr int NQueries = 100000;
using src_vec_t = std::vector<int>;
using index_vec_t = std::vector<int>;
using weight_vec_t = std::vector<int>;
using pair_vec_t = std::vector<std::pair<int, int>>;
using index_map_t = std::map<int, index_vec_t>;
using interval_t = std::pair<int, int>;
using interval_vec_t = std::vector<interval_t>;
using small_map_t = std::vector<std::pair<int, int>>;
using query_vec_t = std::vector<small_map_t>;
constexpr int None = -1;
constexpr int Junk = -2;
src_vec_t generate_e()
{ // good query length = 3
src_vec_t src;
std::random_device rd;
std::default_random_engine eng{rd()};
auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);
for (int i = 0; i < SrcSize; ++i)
{
int x = exp();
src.push_back(x);
//std::cout << x << ' ';
}
return src;
}
src_vec_t generate_ep()
{ // good query length = 500
src_vec_t src;
std::random_device rd;
std::default_random_engine eng{rd()};
auto exp = std::bind(std::exponential_distribution<>{0.4}, eng);
auto poisson = std::bind(std::poisson_distribution<int>{100}, eng);
while (int(src.size()) < SrcSize)
{
int x = exp();
int n = poisson();
for (int i = 0; i < n; ++i)
{
src.push_back(x);
//std::cout << x << ' ';
}
}
return src;
}
src_vec_t generate()
{
//return generate_e();
return generate_ep();
}
int trivial(const src_vec_t& src, interval_t qi)
{
int count = 0;
int majorityElement = 0; // will be assigned before use for valid args
for (int i = qi.first; i <= qi.second; ++i)
{
if (count == 0)
majorityElement = src[i];
if (src[i] == majorityElement)
++count;
else
--count;
}
count = 0;
for (int i = qi.first; i <= qi.second; ++i)
{
if (src[i] == majorityElement)
count++;
}
if (2 * count > qi.second + 1 - qi.first)
return majorityElement;
else
return None;
}
index_map_t sort_ind(const src_vec_t& src)
{
int ind = 0;
index_map_t im;
for (auto x: src)
im[x].push_back(ind++);
return im;
}
weight_vec_t get_weights(const index_vec_t& indexes)
{
weight_vec_t weights;
for (int i = 0; i != int(indexes.size()); ++i)
weights.push_back(2 * i - indexes[i]);
return weights;
}
pair_vec_t get_prefix(const index_vec_t& indexes, const weight_vec_t& weights)
{
pair_vec_t prefix;
for (int i = 0; i != int(indexes.size()); ++i)
if (prefix.empty() || weights[i] < prefix.back().second)
prefix.emplace_back(indexes[i], weights[i]);
return prefix;
}
pair_vec_t get_suffix(const index_vec_t& indexes, const weight_vec_t& weights)
{
pair_vec_t suffix;
for (int i = indexes.size() - 1; i >= 0; --i)
if (suffix.empty() || weights[i] > suffix.back().second)
suffix.emplace_back(indexes[i], weights[i]);
std::reverse(suffix.begin(), suffix.end());
return suffix;
}
interval_vec_t get_intervals(const pair_vec_t& prefix, const pair_vec_t& suffix)
{
interval_vec_t intervals;
int prev_suffix_index = 0; // will be assigned before use for correct args
int prev_suffix_weight = 0; // same assumptions
for (int ind_pref = 0, ind_suff = 0; ind_pref != int(prefix.size());)
{
auto i_pref = prefix[ind_pref].first;
auto w_pref = prefix[ind_pref].second;
if (ind_suff != int(suffix.size()))
{
auto i_suff = suffix[ind_suff].first;
auto w_suff = suffix[ind_suff].second;
if (w_pref <= w_suff)
{
auto beg = std::max(0, i_pref + w_pref - w_suff);
if (i_pref < i_suff)
intervals.emplace_back(beg, i_suff + 1);
if (w_pref == w_suff)
++ind_pref;
++ind_suff;
prev_suffix_index = i_suff;
prev_suffix_weight = w_suff;
continue;
}
}
// ind_suff out of bounds or w_pref > w_suff:
auto end = prev_suffix_index + prev_suffix_weight - w_pref + 1;
// end may be out-of-bounds; that's OK if overflow is not possible
intervals.emplace_back(i_pref, end);
++ind_pref;
}
return intervals;
}
interval_vec_t merge(const interval_vec_t& from)
{
using endpoints_t = std::vector<std::pair<int, bool>>;
endpoints_t ep(2 * from.size());
std::transform(from.begin(), from.end(), ep.begin(),
[](interval_t x){ return std::make_pair(x.first, true); });
std::transform(from.begin(), from.end(), ep.begin() + from.size(),
[](interval_t x){ return std::make_pair(x.second, false); });
std::sort(ep.begin(), ep.end());
interval_vec_t to;
int start; // will be assigned before use for correct args
int overlaps = 0;
for (auto& x: ep)
{
if (x.second) // begin
{
if (overlaps++ == 0)
start = x.first;
}
else // end
{
if (--overlaps == 0)
to.emplace_back(start, x.first);
}
}
return to;
}
interval_vec_t get_intervals(const index_vec_t& indexes)
{
auto weights = get_weights(indexes);
auto prefix = get_prefix(indexes, weights);
auto suffix = get_suffix(indexes, weights);
auto intervals = get_intervals(prefix, suffix);
return merge(intervals);
}
void update_qv(
query_vec_t& qv,
int value,
const interval_vec_t& intervals,
const index_vec_t& iv)
{
int iv_ind = 0;
int qv_ind = 0;
int accum = 0;
for (auto& interval: intervals)
{
int i_begin = interval.first;
int i_end = std::min<int>(interval.second, qv.size() - 1);
while (iv[iv_ind] < i_begin)
{
++accum;
++iv_ind;
}
qv_ind = std::max(qv_ind, i_begin);
while (qv_ind <= i_end)
{
qv[qv_ind].emplace_back(value, accum);
if (iv[iv_ind] == qv_ind)
{
++accum;
++iv_ind;
}
++qv_ind;
}
}
}
void print_preprocess_stat(const index_map_t& im, const query_vec_t& qv)
{
double sum_coverage = 0.;
int max_coverage = 0;
for (auto& x: qv)
{
sum_coverage += x.size();
max_coverage = std::max<int>(max_coverage, x.size());
}
std::cout << " size = " << qv.size() - 1 << '\n';
std::cout << " values = " << im.size() << '\n';
std::cout << " max coverage = " << max_coverage << '\n';
std::cout << " avg coverage = " << sum_coverage / qv.size() << '\n';
}
query_vec_t preprocess(const src_vec_t& src)
{
query_vec_t qv(src.size() + 1);
auto im = sort_ind(src);
for (auto& val: im)
{
auto intervals = get_intervals(val.second);
update_qv(qv, val.first, intervals, val.second);
}
print_preprocess_stat(im, qv);
return qv;
}
int do_query(const src_vec_t& src, const query_vec_t& qv, interval_t qi)
{
if (qi.first == qi.second)
return src[qi.first];
auto b = qv[qi.first].begin();
auto e = qv[qi.second + 1].begin();
while (b != qv[qi.first].end() && e != qv[qi.second + 1].end())
{
if (b->first < e->first)
{
++b;
}
else if (e->first < b->first)
{
++e;
}
else // if (e->first == b->first)
{
// hope this doesn't overflow
if (2 * (e->second - b->second) > qi.second + 1 - qi.first)
return b->first;
++b;
++e;
}
}
return None;
}
int main()
{
std::random_device rd;
std::default_random_engine eng{rd()};
auto poisson = std::bind(std::poisson_distribution<int>{500}, eng);
int majority = 0;
int nonzero = 0;
int failed = 0;
auto src = generate();
auto qv = preprocess(src);
for (int i = 0; i < NQueries; ++i)
{
int size = poisson();
auto ud = std::uniform_int_distribution<int>(0, src.size() - size - 1);
int start = ud(eng);
int stop = start + size;
auto res1 = do_query(src, qv, {start, stop});
auto res2 = trivial(src, {start, stop});
//std::cout << size << ": " << res1 << ' ' << res2 << '\n';
if (res2 != res1)
++failed;
if (res2 != None)
{
++majority;
if (res2 != 0)
++nonzero;
}
}
std::cout << "majority elements = " << 100. * majority / NQueries << "%\n";
std::cout << " nonzero elements = " << 100. * nonzero / NQueries << "%\n";
std::cout << " queries = " << NQueries << '\n';
std::cout << " failed = " << failed << '\n';
return 0;
}
関連作業:
この質問に対する他の回答で指摘されているように、この問題が既に解決されている他の作業があります: S. Durocher、M. He、I Munro、PK Nicholson、M. Skala による「定数時間と線形空間における範囲の多数派」。
このホワイト ペーパーで提示されているアルゴリズムは、クエリ時間:O(1)
の代わりに、O(log n)
およびスペース:のO(n)
代わりに、より優れた漸近的な複雑さを備えていますO(n log n)
。
スペースの複雑さが向上すると、このアルゴリズムはより大きなデータセットを処理できます (この回答で提案されているアルゴリズムと比較して)。前処理されたデータに必要なメモリが少なく、より規則的なデータ アクセス パターンがあれば、おそらく、このアルゴリズムはデータをより迅速に前処理できます。しかし、クエリ時間ではそれほど簡単ではありません...
論文からアルゴリズムに最も適した入力データがあるとしましょう: n=1000000000 (2013 年に 10..30 ギガバイトを超えるメモリを搭載したシステムを想像するのは困難です)。
この回答で提案されているアルゴリズムは、クエリごとに最大 120 (または 2 クエリ境界 * 2 * log n) の要素を処理する必要があります。ただし、線形検索と同様に、非常に単純な操作を実行します。また、連続する 2 つのメモリ領域にシーケンシャルにアクセスするため、キャッシュ フレンドリーです。
この論文のアルゴリズムは、クエリごとに最大 20 の操作 (または 2 つのクエリ境界 * 5 つの候補 * 2 つのウェーブレット ツリー レベル) を実行する必要があります。これは 6 分の 1 です。ただし、各操作はより複雑です。ビット カウンター自体の簡潔な表現の各クエリには、線形検索が含まれています (つまり、1 回ではなく 20 回の線形検索を意味します)。最悪なことに、そのような各操作は複数の独立したメモリ領域にアクセスする必要があるため (クエリのサイズが非常に小さく、したがって 4 倍のサイズが非常に小さい場合を除きます)、クエリはキャッシュに適していません。これは、各クエリ (while は一定時間の操作) がかなり遅く、おそらくここで提案されているアルゴリズムよりも遅いことを意味します。入力配列のサイズを小さくすると、ここで提案されたアルゴリズムがより高速になる可能性が高くなります。
この論文のアルゴリズムの実際的な欠点は、ウェーブレット ツリーと簡潔なビット カウンターの実装です。それらを最初から実装すると、かなり時間がかかる場合があります。既存の実装を使用することは必ずしも便利ではありません。