Haskell 用の配列ライブラリ Repa は非常に興味深いものであり、簡単なプログラムを作成して、その使用方法を理解しようとしました。また、リストを使用して簡単な実装を作成しましたが、これははるかに高速であることが証明されました。私の主な質問は、以下の Repa コードをどのように改善して最も効率的なものにするか (そしてできれば非常に読みやすいものにすること) です。私はHaskellを使用するのはまったく初めてで、Repaに関する簡単に理解できるチュートリアルを見つけることができませんでした.[ Haskell Wikiにあるものを編集してください. :) たとえば、force または deepSeqArray をいつ使用するかがわかりません。
このプログラムは、次の方法で球体の体積を概算するために使用されます。
- 球体の中心点と半径、および球体を取り囲むことが知られている立方体内の等間隔の座標が指定されます。
- プログラムは各座標を取得し、球の中心までの距離を計算し、それが球の半径よりも小さい場合は、球の合計 (概算) 体積を加算するために使用されます。
リストを使用するバージョンと repa を使用するバージョンの 2 つのバージョンを以下に示します。特にこのユースケースでは、コードが非効率的であることはわかっていますが、後でもっと複雑にするという考えです。
以下の値の場合、"ghc -Odph -fllvm -fforce-recomp -rtsopts -threaded" でコンパイルすると、リスト バージョンは 1.4 秒、repa バージョンは +RTS -N1 で 12 秒、+RTS - で 10 秒かかります。 N2、ただし火花は変換されません (Windows 7 64、GHC 7.0.2、および llvm 2.8 を実行するデュアルコア Intel マシン (Core 2 Duo E7400 @ 2.8 GHz) を使用しています)。(以下のメインの正しい行をコメントアウトして、バージョンの 1 つだけを実行してください。)
助けてくれてありがとう!
import Data.Array.Repa as R
import qualified Data.Vector.Unboxed as V
import Prelude as P
-- Calculate the volume of a sphere by putting it in a bath of coordinates. Generate coordinates (x,y,z) in a cuboid. Then, for each coordinate, check if it is inside the sphere. Sum those coordinates and multiply by the coordinate grid step size to find an approximate volume.
particles = [(0,0,0)] -- used for the list alternative --[(0,0,0),(0,2,0)]
particles_repa = [0,0,0::Double] -- used for the repa alternative, can currently just be one coordinate
-- Radius of the particle
a = 4
-- Generate the coordinates. Could this be done more efficiently, and at the same time simple? In Matlab I would use ndgrid.
step = 0.1 --0.05
xrange = [-10,-10+step..10] :: [Double]
yrange = [-10,-10+step..10]
zrange = [-10,-10+step..10]
-- All coordinates as triples. These are used directly in the list version below.
coords = [(x,y,z) | x <- xrange, y <- yrange, z <- zrange]
---- List code ----
volumeIndividuals = fromIntegral (length particles) * 4*pi*a**3/3
volumeInside = step**3 * fromIntegral (numberInsideParticles particles coords)
numberInsideParticles particles coords = length $ filter (==True) $ P.map (insideParticles particles) coords
insideParticles particles coord = any (==True) $ P.map (insideParticle coord) particles
insideParticle (xc,yc,zc) (xp,yp,zp) = ((xc-xp)^2+(yc-yp)^2+(zc-zp)^2) < a**2
---- End list code ----
---- Repa code ----
-- Put the coordinates in a Nx3 array.
xcoords = P.map (\(x,_,_) -> x) coords
ycoords = P.map (\(_,y,_) -> y) coords
zcoords = P.map (\(_,_,z) -> z) coords
-- Total number of coordinates
num_coords = (length xcoords) ::Int
xcoords_r = fromList (Z :. num_coords :. (1::Int)) xcoords
ycoords_r = fromList (Z :. num_coords :. (1::Int)) ycoords
zcoords_r = fromList (Z :. num_coords :. (1::Int)) zcoords
rcoords = xcoords_r R.++ ycoords_r R.++ zcoords_r
-- Put the particle coordinates in an array, then extend (replicate) this array so that its size becomes the same as that of rcoords
particle = fromList (Z :. (1::Int) :. (3::Int)) particles_repa
particle_slice = slice particle (Z :. (0::Int) :. All)
particle_extended = extend (Z :. num_coords :. All) particle_slice
-- Calculate the squared difference between the (x,y,z) coordinates of the particle and the coordinates of the cuboid.
squared_diff = deepSeqArrays [rcoords,particle_extended] ((force2 rcoords) -^ (force2 particle_extended)) **^ 2
(**^) arr pow = R.map (**pow) arr
xslice = slice squared_diff (Z :. All :. (0::Int))
yslice = slice squared_diff (Z :. All :. (1::Int))
zslice = slice squared_diff (Z :. All :. (2::Int))
-- Calculate the distance between each coordinate and the particle center
sum_squared_diff = [xslice,yslice,zslice] `deepSeqArrays` xslice +^ yslice +^ zslice
-- Do the rest using vector, since I didn't get the repa variant working.
ssd_vec = toVector sum_squared_diff
-- Determine the number of the coordinates that are within the particle (instead of taking the square root to get the distances above, I compare to the square of the radius here, to improve performance)
total_within = fromIntegral (V.length $ V.filter (<a**2) ssd_vec)
--total_within = foldAll (\x acc -> if x < a**2 then acc+1 else acc) 0 sum_squared_diff
-- Finally, calculate an approximation of the volume of the sphere by taking the volume of the cubes with side step, multiplied with the number of coordinates within the sphere.
volumeInside_repa = step**3 * total_within
-- Helper function that shows the size of a 2-D array.
rsize = reverse . listOfShape . (extent :: Array DIM2 Double -> DIM2)
---- End repa code ----
-- Comment out the list or the repa version if you want to time the calculations separately.
main = do
putStrLn $ "Step = " P.++ show step
putStrLn $ "Volume of individual particles = " P.++ show volumeIndividuals
putStrLn $ "Volume of cubes inside particles (list) = " P.++ show volumeInside
putStrLn $ "Volume of cubes inside particles (repa) = " P.++ show volumeInside_repa
編集:上記のようにコードを書いた理由を説明する背景:
私は主に Matlab でコードを書いており、パフォーマンス向上の経験は主にその分野から来ています。Matlab では、通常、行列を直接操作する関数を使用して計算を行い、パフォーマンスを向上させます。上記の問題を Matlab R2010b で実装すると、以下に示すマトリックス バージョンを使用すると 0.9 秒かかり、ネストされたループを使用すると 15 秒かかります。Haskell が Matlab と大きく異なることは知っていますが、Haskell でリストの使用から Repa 配列の使用に移行することで、コードのパフォーマンスが向上することを願っていました。リストからの変換 -> Repa 配列 -> ベクトルがあるのは、それらをより良いものに置き換えるのに十分なスキルがないためです。これが私が入力を求める理由です。:) したがって、上記のタイミングの数値は主観的なものです。パフォーマンスは、言語の能力よりも優れていますが、何を使用するかを決定するのは、それを機能させることができるかどうかに依存するため、現在の私にとって有効な指標です.
tl;dr: 上記の Repa コードがばかげているか病的である可能性があることは理解していますが、それが今できる最善のことです。私はより良い Haskell コードを書けるようになりたいと思っています。あなたがその方向で私を助けてくれることを願っています (dons は既に行っています)。:)
function archimedes_simple()
particles = [0 0 0]';
a = 4;
step = 0.1;
xrange = [-10:step:10];
yrange = [-10:step:10];
zrange = [-10:step:10];
[X,Y,Z] = ndgrid(xrange,yrange,zrange);
dists2 = bsxfun(@minus,X,particles(1)).^2+ ...
bsxfun(@minus,Y,particles(2)).^2+ ...
bsxfun(@minus,Z,particles(3)).^2;
inside = dists2 < a^2;
num_inside = sum(inside(:));
disp('');
disp(['Step = ' num2str(step)]);
disp(['Volume of individual particles = ' num2str(size(particles,2)*4*pi*a^3/3)]);
disp(['Volume of cubes inside particles = ' num2str(step^3*num_inside)]);
end
編集 2 : Repa コードの新しい、より高速でシンプルなバージョン
Repaについてもう少し読んで、少し考えました。以下は、新しい Repa バージョンです。この場合、Repa 拡張関数を使用して、値のリストから x、y、および z 座標を 3 次元配列として作成します (Matlab での ndgrid の動作と同様)。次に、これらの配列をマッピングして、球状粒子までの距離を計算します。最後に、結果として得られた 3 次元距離配列を折り畳み、球内にある座標の数を数え、定数係数を掛けて、おおよその体積を取得します。私のアルゴリズムの実装は、上記の Matlab バージョンに非常に似たものになり、ベクトルへの変換はなくなりました。
新しいバージョンは、私のコンピューターで約 5 秒で実行され、上記から大幅に改善されています。コンパイル中に「スレッド化」を使用し、「+RTS -N2」と組み合わせて使用してもしなくても、タイミングは同じですが、スレッド化バージョンではコンピューターの両方のコアが最大になります。ただし、「-N2」の数滴が 3.1 秒まで実行されるのを見ましたが、後で再現できませんでした。同時に実行されている他のプロセスに非常に敏感なのでしょうか? ベンチマーク時にコンピューターのほとんどのプログラムを終了しましたが、バックグラウンド プロセスなど、まだいくつかのプログラムが実行されています。
「-N2」を使用し、ランタイム スイッチを追加して並列 GC (-qg) をオフにすると、時間は一貫して ~4.1 秒に短縮され、-qa を使用して「OS を使用してスレッド アフィニティを設定する (実験的)」、時間は約 3.5 秒まで短縮されました。「+RTS -s」を使用してプログラムを実行した結果の出力を見ると、-qg を使用して実行される GC ははるかに少なくなります。
今日の午後、楽しみのために、8 コアのコンピューターでコードを実行できるかどうかを確認します。:)
import Data.Array.Repa as R
import Prelude as P
import qualified Data.List as L
-- Calculate the volume of a spherical particle by putting it in a bath of coordinates. Generate coordinates (x,y,z) in a cuboid. Then, for each coordinate, check if it is inside the sphere. Sum those coordinates and multiply by the coordinate grid step size to find an approximate volume.
particles :: [(Double,Double,Double)]
particles = [(0,0,0)]
-- Radius of the spherical particle
a = 4
volume_individuals = fromIntegral (length particles) * 4*pi*a^3/3
-- Generate the coordinates.
step = 0.1
coords_list = [-10,-10+step..10] :: [Double]
num_coords = (length coords_list) :: Int
coords :: Array DIM1 Double
coords = fromList (Z :. (num_coords ::Int)) coords_list
coords_slice :: Array DIM1 Double
coords_slice = slice coords (Z :. All)
-- x, y and z are 3-D arrays, where the same index into each array can be used to find a single coordinate, e.g. (x(i,j,k),y(i,j,k),z(i,j,k)).
x,y,z :: Array DIM3 Double
x = extend (Z :. All :. num_coords :. num_coords) coords_slice
y = extend (Z :. num_coords :. All :. num_coords) coords_slice
z = extend (Z :. num_coords :. num_coords :. All) coords_slice
-- Calculate the squared distance from each coordinate to the center of the spherical particle.
dist2 :: (Double, Double, Double) -> Array DIM3 Double
dist2 particle = ((R.map (squared_diff xp) x) + (R.map (squared_diff yp) y) + (R.map ( squared_diff zp) z))
where
(xp,yp,zp) = particle
squared_diff xi xa = (xa-xi)^2
-- Count how many of the coordinates are within the spherical particle.
num_inside_particle :: (Double,Double,Double) -> Double
num_inside_particle particle = foldAll (\acc x -> if x<a^2 then acc+1 else acc) 0 (force $ dist2 particle)
-- Calculate the approximate volume covered by the spherical particle.
volume_inside :: [Double]
volume_inside = P.map ((*step^3) . num_inside_particle) particles
main = do
putStrLn $ "Step = " P.++ show step
putStrLn $ "Volume of individual particles = " P.++ show volume_individuals
putStrLn $ "Volume of cubes inside each particle (repa) = " P.++ (P.concat . ( L.intersperse ", ") . P.map show) volume_inside
-- As an alternative, y and z could be generated from x, but this was slightly slower in my tests (~0.4 s).
--y = permute_dims_3D x
--z = permute_dims_3D y
-- Permute the dimensions in a 3-D array, (forward, cyclically)
permute_dims_3D a = backpermute (swap e) swap a
where
e = extent a
swap (Z :. i:. j :. k) = Z :. k :. i :. j
新しいコードのスペース プロファイリング
Don Stewart が作成したのと同じタイプのプロファイルですが、新しい Repa コード用です。