3-dの点は、(x、y、z)によって定義されます。任意の2点(X、Y、Z)と(x、y、z)の間の距離dは、d = Sqrt [(Xx)^ 2 +(Yy)^ 2 +(Zz)^2]です。現在、ファイルには100万のエントリがあり、各エントリは特定の順序ではなく、空間内のあるポイントです。任意のポイント(a、b、c)が与えられた場合、それに最も近い10ポイントを見つけます。百万ポイントをどのように保存し、そのデータ構造からそれらの10ポイントをどのように取得しますか。
12 に答える
ミリオンポイントは小さな数字です。ここでは最も単純なアプローチが機能します (KDTree に基づくコードは低速です (1 つのポイントのみを照会するため))。
力ずくのアプローチ (時間 ~1 秒)
#!/usr/bin/env python
import numpy
NDIM = 3 # number of dimensions
# read points into array
a = numpy.fromfile('million_3D_points.txt', sep=' ')
a.shape = a.size / NDIM, NDIM
point = numpy.random.uniform(0, 100, NDIM) # choose random point
print 'point:', point
d = ((a-point)**2).sum(axis=1) # compute distances
ndx = d.argsort() # indirect sort
# print 10 nearest points to the chosen one
import pprint
pprint.pprint(zip(a[ndx[:10]], d[ndx[:10]]))
それを実行します:
$ time python nearest.py
point: [ 69.06310224 2.23409409 50.41979143]
[(array([ 69., 2., 50.]), 0.23500677815852947),
(array([ 69., 2., 51.]), 0.39542392750839772),
(array([ 69., 3., 50.]), 0.76681859086988302),
(array([ 69., 3., 50.]), 0.76681859086988302),
(array([ 69., 3., 51.]), 0.9272357402197513),
(array([ 70., 2., 50.]), 1.1088022980015722),
(array([ 70., 2., 51.]), 1.2692194473514404),
(array([ 70., 2., 51.]), 1.2692194473514404),
(array([ 70., 3., 51.]), 1.801031260062794),
(array([ 69., 1., 51.]), 1.8636121147970444)]
real 0m1.122s
user 0m1.010s
sys 0m0.120s
百万の 3D ポイントを生成するスクリプトは次のとおりです。
#!/usr/bin/env python
import random
for _ in xrange(10**6):
print ' '.join(str(random.randrange(100)) for _ in range(3))
出力:
$ head million_3D_points.txt
18 56 26
19 35 74
47 43 71
82 63 28
43 82 0
34 40 16
75 85 69
88 58 3
0 63 90
81 78 98
そのコードを使用して、より複雑なデータ構造とアルゴリズムをテストできます (たとえば、実際に消費するメモリが少ないか、上記の最も単純なアプローチよりも高速かどうかなど)。現時点では、作業コードを含む唯一の回答であることに注意してください。
KDTreeに基づくソリューション(時間 ~1.4 秒)
#!/usr/bin/env python
import numpy
NDIM = 3 # number of dimensions
# read points into array
a = numpy.fromfile('million_3D_points.txt', sep=' ')
a.shape = a.size / NDIM, NDIM
point = [ 69.06310224, 2.23409409, 50.41979143] # use the same point as above
print 'point:', point
from scipy.spatial import KDTree
# find 10 nearest points
tree = KDTree(a, leafsize=a.shape[0]+1)
distances, ndx = tree.query([point], k=10)
# print 10 nearest points to the chosen one
print a[ndx]
それを実行します:
$ time python nearest_kdtree.py
point: [69.063102240000006, 2.2340940900000001, 50.419791429999997]
[[[ 69. 2. 50.]
[ 69. 2. 51.]
[ 69. 3. 50.]
[ 69. 3. 50.]
[ 69. 3. 51.]
[ 70. 2. 50.]
[ 70. 2. 51.]
[ 70. 2. 51.]
[ 70. 3. 51.]
[ 69. 1. 51.]]]
real 0m1.359s
user 0m1.280s
sys 0m0.080s
C++ での部分的な並べ替え (時間 ~1.1 秒)
// $ g++ nearest.cc && (time ./a.out < million_3D_points.txt )
#include <algorithm>
#include <iostream>
#include <vector>
#include <boost/lambda/lambda.hpp> // _1
#include <boost/lambda/bind.hpp> // bind()
#include <boost/tuple/tuple_io.hpp>
namespace {
typedef double coord_t;
typedef boost::tuple<coord_t,coord_t,coord_t> point_t;
coord_t distance_sq(const point_t& a, const point_t& b) { // or boost::geometry::distance
coord_t x = a.get<0>() - b.get<0>();
coord_t y = a.get<1>() - b.get<1>();
coord_t z = a.get<2>() - b.get<2>();
return x*x + y*y + z*z;
}
}
int main() {
using namespace std;
using namespace boost::lambda; // _1, _2, bind()
// read array from stdin
vector<point_t> points;
cin.exceptions(ios::badbit); // throw exception on bad input
while(cin) {
coord_t x,y,z;
cin >> x >> y >> z;
points.push_back(boost::make_tuple(x,y,z));
}
// use point value from previous examples
point_t point(69.06310224, 2.23409409, 50.41979143);
cout << "point: " << point << endl; // 1.14s
// find 10 nearest points using partial_sort()
// Complexity: O(N)*log(m) comparisons (O(N)*log(N) worst case for the implementation)
const size_t m = 10;
partial_sort(points.begin(), points.begin() + m, points.end(),
bind(less<coord_t>(), // compare by distance to the point
bind(distance_sq, _1, point),
bind(distance_sq, _2, point)));
for_each(points.begin(), points.begin() + m, cout << _1 << "\n"); // 1.16s
}
それを実行します:
g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt )
point: (69.0631 2.23409 50.4198)
(69 2 50)
(69 2 51)
(69 3 50)
(69 3 50)
(69 3 51)
(70 2 50)
(70 2 51)
(70 2 51)
(70 3 51)
(69 1 51)
real 0m1.152s
user 0m1.140s
sys 0m0.010s
C++ のプライオリティ キュー (時間 ~1.2 秒)
#include <algorithm> // make_heap
#include <functional> // binary_function<>
#include <iostream>
#include <boost/range.hpp> // boost::begin(), boost::end()
#include <boost/tr1/tuple.hpp> // get<>, tuple<>, cout <<
namespace {
typedef double coord_t;
typedef std::tr1::tuple<coord_t,coord_t,coord_t> point_t;
// calculate distance (squared) between points `a` & `b`
coord_t distance_sq(const point_t& a, const point_t& b) {
// boost::geometry::distance() squared
using std::tr1::get;
coord_t x = get<0>(a) - get<0>(b);
coord_t y = get<1>(a) - get<1>(b);
coord_t z = get<2>(a) - get<2>(b);
return x*x + y*y + z*z;
}
// read from input stream `in` to the point `point_out`
std::istream& getpoint(std::istream& in, point_t& point_out) {
using std::tr1::get;
return (in >> get<0>(point_out) >> get<1>(point_out) >> get<2>(point_out));
}
// Adaptable binary predicate that defines whether the first
// argument is nearer than the second one to given reference point
template<class T>
class less_distance : public std::binary_function<T, T, bool> {
const T& point;
public:
less_distance(const T& reference_point) : point(reference_point) {}
bool operator () (const T& a, const T& b) const {
return distance_sq(a, point) < distance_sq(b, point);
}
};
}
int main() {
using namespace std;
// use point value from previous examples
point_t point(69.06310224, 2.23409409, 50.41979143);
cout << "point: " << point << endl;
const size_t nneighbours = 10; // number of nearest neighbours to find
point_t points[nneighbours+1];
// populate `points`
for (size_t i = 0; getpoint(cin, points[i]) && i < nneighbours; ++i)
;
less_distance<point_t> less_distance_point(point);
make_heap (boost::begin(points), boost::end(points), less_distance_point);
// Complexity: O(N*log(m))
while(getpoint(cin, points[nneighbours])) {
// add points[-1] to the heap; O(log(m))
push_heap(boost::begin(points), boost::end(points), less_distance_point);
// remove (move to last position) the most distant from the
// `point` point; O(log(m))
pop_heap (boost::begin(points), boost::end(points), less_distance_point);
}
// print results
push_heap (boost::begin(points), boost::end(points), less_distance_point);
// O(m*log(m))
sort_heap (boost::begin(points), boost::end(points), less_distance_point);
for (size_t i = 0; i < nneighbours; ++i) {
cout << points[i] << ' ' << distance_sq(points[i], point) << '\n';
}
}
それを実行します:
$ g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt )
point: (69.0631 2.23409 50.4198)
(69 2 50) 0.235007
(69 2 51) 0.395424
(69 3 50) 0.766819
(69 3 50) 0.766819
(69 3 51) 0.927236
(70 2 50) 1.1088
(70 2 51) 1.26922
(70 2 51) 1.26922
(70 3 51) 1.80103
(69 1 51) 1.86361
real 0m1.174s
user 0m1.180s
sys 0m0.000s
Linear Search ベースのアプローチ (時間 ~1.15 秒)
// $ g++ -O3 nearest.cc && (time ./a.out < million_3D_points.txt )
#include <algorithm> // sort
#include <functional> // binary_function<>
#include <iostream>
#include <boost/foreach.hpp>
#include <boost/range.hpp> // begin(), end()
#include <boost/tr1/tuple.hpp> // get<>, tuple<>, cout <<
#define foreach BOOST_FOREACH
namespace {
typedef double coord_t;
typedef std::tr1::tuple<coord_t,coord_t,coord_t> point_t;
// calculate distance (squared) between points `a` & `b`
coord_t distance_sq(const point_t& a, const point_t& b);
// read from input stream `in` to the point `point_out`
std::istream& getpoint(std::istream& in, point_t& point_out);
// Adaptable binary predicate that defines whether the first
// argument is nearer than the second one to given reference point
class less_distance : public std::binary_function<point_t, point_t, bool> {
const point_t& point;
public:
explicit less_distance(const point_t& reference_point)
: point(reference_point) {}
bool operator () (const point_t& a, const point_t& b) const {
return distance_sq(a, point) < distance_sq(b, point);
}
};
}
int main() {
using namespace std;
// use point value from previous examples
point_t point(69.06310224, 2.23409409, 50.41979143);
cout << "point: " << point << endl;
less_distance nearer(point);
const size_t nneighbours = 10; // number of nearest neighbours to find
point_t points[nneighbours];
// populate `points`
foreach (point_t& p, points)
if (! getpoint(cin, p))
break;
// Complexity: O(N*m)
point_t current_point;
while(cin) {
getpoint(cin, current_point); //NOTE: `cin` fails after the last
//point, so one can't lift it up to
//the while condition
// move to the last position the most distant from the
// `point` point; O(m)
foreach (point_t& p, points)
if (nearer(current_point, p))
// found point that is nearer to the `point`
//NOTE: could use insert (on sorted sequence) & break instead
//of swap but in that case it might be better to use
//heap-based algorithm altogether
std::swap(current_point, p);
}
// print results; O(m*log(m))
sort(boost::begin(points), boost::end(points), nearer);
foreach (point_t p, points)
cout << p << ' ' << distance_sq(p, point) << '\n';
}
namespace {
coord_t distance_sq(const point_t& a, const point_t& b) {
// boost::geometry::distance() squared
using std::tr1::get;
coord_t x = get<0>(a) - get<0>(b);
coord_t y = get<1>(a) - get<1>(b);
coord_t z = get<2>(a) - get<2>(b);
return x*x + y*y + z*z;
}
std::istream& getpoint(std::istream& in, point_t& point_out) {
using std::tr1::get;
return (in >> get<0>(point_out) >> get<1>(point_out) >> get<2>(point_out));
}
}
測定では、ほとんどの時間がファイルからの配列の読み取りに費やされていることが示されていますが、実際の計算にかかる時間は桁違いに少なくなります。
百万のエントリがすでにファイルにある場合は、それらすべてをメモリ内のデータ構造にロードする必要はありません。これまでに見つかった上位10ポイントの配列を保持し、100万ポイントをスキャンして、上位10ポイントのリストを更新します。
これはポイント数のO(n)です。
ポイントをk次元ツリー(kdツリー)に格納できます。Kd木は、最近傍探索(特定の点に最も近いn個の点を見つける)用に最適化されています。
これは、無理をしようとしないかどうかをテストするトリッキーな質問だと思います。
人々がすでに上で示した最も単純なアルゴリズムを考えてみましょう: これまでで最高の 10 個の候補のテーブルを保持し、すべてのポイントを 1 つずつ調べます。これまでのベスト 10 のいずれよりも近いポイントを見つけた場合は、それを置き換えます。複雑さは何ですか?ファイルの各ポイントを1 回見て、その距離 (または実際には距離の 2 乗) を計算し、10 番目に近いポイントと比較する必要があります。より良い場合は、これまでのベスト 10 の表の適切な場所に挿入します。
では、複雑さは何ですか?各点を 1 回見るので、距離の計算は n 回、比較は n 回です。ポイントがより良い場合は、それを正しい位置に挿入する必要があります。これにはさらに比較が必要ですが、最適な候補のテーブルは一定のサイズ 10 であるため、これは一定の要素です。
最終的に、ポイント数で O(n) の線形時間で実行されるアルゴリズムになります。
しかし今、そのようなアルゴリズムの下限を考えてみてください。入力データに順序がない場合は、各点を調べて、最も近い点ではないかどうかを確認する必要があります。私が見る限り、下限は Omega(n) であるため、上記のアルゴリズムが最適です。
距離を計算する必要はありません。距離の 2 乗だけでニーズを満たすことができます。もっと速いはずです。つまり、sqrtビットをスキップできます。
この質問は基本的に、スペース パーティショニング アルゴリズムに関する知識や直感をテストするものです。データをoctreeに格納するのが最善の策だと思います。これは、この種の問題 (数百万の頂点の格納、レイ トレーシング、衝突の検出など) を処理する 3D エンジンで一般的に使用されます。ルックアップ時間はlog(n)、最悪の場合のシナリオのオーダーになります (私は信じています)。
これは宿題の問題じゃないですよね?;-)
私の考え: すべてのポイントを繰り返し処理し、それらを最小ヒープまたは制限された優先度キューに入れ、ターゲットからの距離をキーにします。
簡単なアルゴリズム:
ポイントをタプルのリストとして保存し、ポイントをスキャンして距離を計算し、「最も近い」リストを保持します。
より創造的:
ポイントを領域(「0,0,0」から「50,50,50」、または「0,0,0」から「-20、-20、-20」で記述された立方体など)にグループ化すると、ターゲットポイントからそれらに「インデックスを付ける」ことができます。ターゲットがどのキューブにあるかを確認し、そのキューブ内のポイントのみを検索します。その立方体のポイントが10未満の場合は、「隣接する」立方体などを確認します。
さらに考えてみると、これはあまり良いアルゴリズムではありません。ターゲットポイントが10ポイントよりも立方体の壁に近い場合は、隣接する立方体も検索する必要があります。
kd-treeアプローチを使用して最も近いノードを見つけ、その最も近いノードを削除(またはマーク)して、新しい最も近いノードを再検索します。すすぎ、繰り返します。
任意の2つのポイントP1(x1、y1、z1)およびP2(x2、y2、z2)について、ポイント間の距離がdの場合、次のすべてが真である必要があります。
|x1 - x2| <= d
|y1 - y2| <= d
|z1 - z2| <= d
セット全体を反復処理するときは、最も近い10を保持しますが、最も近い10までの距離も保持します。見るすべてのポイントの距離を計算する前に、これら3つの条件を使用して、複雑さを大幅に軽減してください。
基本的に私の上の最初の2つの答えの組み合わせ。ポイントはファイル内にあるため、メモリに保持する必要はありません。配列または最小ヒープの代わりに、最大ヒープを使用します。これは、10番目に近いポイントよりも短い距離のみをチェックするためです。配列の場合、新しく計算された各距離を、保持した10個の距離すべてと比較する必要があります。最小ヒープの場合、新しく計算された距離ごとに3つの比較を実行する必要があります。最大ヒープでは、新しく計算された距離がルートノードよりも大きい場合にのみ1つの比較を実行します。
この質問にはさらに定義が必要です。
1)データ全体をメモリに保持できるかどうかによって、データに事前インデックスを付けるアルゴリズムに関する決定は大きく異なります。
kdtreesとoctreesを使用すると、データをメモリに保持する必要がなくなり、メモリフットプリントが低くなるだけでなく、ファイル全体を読み取る必要がなくなるため、パフォーマンスが向上します。
ブルートフォースを使用すると、ファイル全体を読み取り、検索する新しいポイントごとに距離を再計算する必要があります。
それでも、これはあなたにとって重要ではないかもしれません。
2)もう1つの要因は、ポイントを検索する必要がある回数です。
JFセバスチャンが述べたように、大規模なデータセットでもブルートフォースの方が速い場合がありますが、彼のベンチマークでは、ディスクからデータセット全体を読み取ることが測定されていることに注意してください(kd-treeまたはoctreeが構築され、どこかに書き込まれると、これは必要ありません)そして、それらは1回の検索のみを測定します。
それぞれの距離を計算し、O(n) 時間で Select(1..10, n) を実行します。それは私が推測する素朴なアルゴリズムでしょう。