26

問題の説明:

cRAMメモリにロードされた大きなマトリックスがあります。私の目標は、並列処理によって読み取り専用アクセスを取得することです。ただしdoSNOWdoMPIbig.matrix、 などを使用して接続を作成すると、使用される RAM の量が劇的に増加します。

すべてのデータのローカル コピーを作成せずに、すべてのプロセスが読み取り可能な共有メモリを適切に作成する方法はありますか?

例:

libs<-function(libraries){# Installs missing libraries and then load them
  for (lib in libraries){
    if( !is.element(lib, .packages(all.available = TRUE)) ) {
      install.packages(lib)
    }
    library(lib,character.only = TRUE)
  }
}

libra<-list("foreach","parallel","doSNOW","bigmemory")
libs(libra)

#create a matrix of size 1GB aproximatelly
c<-matrix(runif(10000^2),10000,10000)
#convert it to bigmatrix
x<-as.big.matrix(c)
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
out<-foreach(linID = 1:10, .combine=c) %dopar% {
  #load bigmemory
  require(bigmemory)
  # attach the matrix via shared memory??
  m <- attach.big.matrix(mdesc)
  #dummy expression to test data aquisition
  c<-m[1,1]
}
closeAllConnections()

RAM: 上の画像では、メモリが終了して解放さ<code>foreach</code> 中の RAM の使用 れるまでメモリが大幅に増加することがわかります。foreach

4

2 に答える 2

15

foreachこの問題の解決策は、パッケージの作成者である Steve Weston の投稿 (こちら) から確認できると思います。そこで彼は次のように述べています。

doParallel パッケージは、foreach ループで参照されるワーカーに変数を自動エクスポートします。

だから問題は、あなたのコードではあなたの大きな行列cが割り当てで参照されていることだと思いますc<-m[1,1]xyz <- m[1,1]代わりに試してみて、何が起こるか見てください。

以下は、ファイルに基づく を使用した例ですbig.matrix

#create a matrix of size 1GB aproximatelly
n <- 10000
m <- 10000
c <- matrix(runif(n*m),n,m)
#convert it to bigmatrix
x <- as.big.matrix(x = c, type = "double", 
                 separated = FALSE, 
                 backingfile = "example.bin", 
                 descriptorfile = "example.desc")
# get a description of the matrix
mdesc <- describe(x)
# Create the required connections    
cl <- makeCluster(detectCores ())
registerDoSNOW(cl)
## 1) No referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}

ここに画像の説明を入力

## 2) Referencing
out <- foreach(linID = 1:4, .combine=c) %dopar% {
  invisible(c) ## c is referenced and thus exported to workers
  t <- attach.big.matrix("example.desc")
  for (i in seq_len(30L)) {
    for (j in seq_len(m)) {
      y <- t[i,j]
    }
  }
  return(0L)
}
closeAllConnections()

ここに画像の説明を入力

于 2015-07-24T06:36:57.113 に答える
5

または、Linux/Mac を使用していて CoW 共有メモリが必要な場合は、フォークを使用します。mcparallel最初にすべてのデータをメイン スレッドにロードしてから、parallelパッケージから一般的な機能を使用して作業スレッド (フォーク) を起動します。

次のように、ライブラリmccollectを使用して真の共有メモリを使用して、または使用して結果を収集できます。Rdsm

library(parallel)
library(bigmemory) #for shared variables
shared<-bigmemory::big.matrix(nrow = size, ncol = 1, type = 'double')
shared[1]<-1 #Init shared memory with some number

job<-mcparallel({shared[1]<-23}) #...change it in another forked thread
shared[1,1] #...and confirm that it gets changed
# [1] 23

書き込みを遅らせると、値がバックグラウンドで実際に更新されることを確認できます。

fn<-function()
{
  Sys.sleep(1) #One second delay
  shared[1]<-11
}

job<-mcparallel(fn())
shared[1] #Execute immediately after last command
# [1] 23
aaa[1,1] #Execute after one second
# [1] 11
mccollect() #To destroy all forked processes (and possibly collect their output)

同時実行を制御し、競合状態を回避するには、ロックを使用します。

library(synchronicity) #for locks
m<-boost.mutex() #Lets create a mutex "m"

bad.incr<-function() #This function doesn't protect the shared resource with locks:
{
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
}

good.incr<-function()
{
  lock(m)
  a<-shared[1]
  Sys.sleep(1)
  shared[1]<-a+1
  unlock(m)
}

shared[1]<-1
for (i in 1:5) job<-mcparallel(bad.incr())
shared[1] #You can verify, that the value didn't get increased 5 times due to race conditions

mccollect() #To clear all threads, not to get the values
shared[1]<-1
for (i in 1:5) job<-mcparallel(good.incr())
shared[1] #As expected, eventualy after 5 seconds of waiting you get the 6
#[1] 6 

mccollect()

編集:

Rdsm::mgrmakevarに交換することで、依存関係を少し単純化しましbigmemory::big.matrixた。mgrmakevarとにかく内部的に呼び出しbig.matrixます。これ以上は必要ありません。

于 2016-06-22T09:44:34.263 に答える