0

Rを使用してMySQLデータベースから並行してデータをフェッチしようとしています。次のコードは、データを1つずつフェッチして正常に動作しています。しかし、複数のクエリを送信してさまざまな変数に保存することで、プロセスをスピードアップしたいと考えています。後で、変数内で時系列をマージします。

library(RMySQL)
dbConnect(MySQL(), user='external', password='xxxxxxx', dbname='GMT_Minute_Data', host='xx.xx.xxx.xxx')

sqlData <-select TradeTime, Open, High, Low, Close from ad where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data1= dbFetch(sqlData, n=-1)
sqlData <-select TradeTime, Open, High, Low, Close from ty where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data2 = dbFetch(sqlData, n=-1)
sqlData <-select TradeTime, Open, High, Low, Close from ax where tradetime between ‘2014-01-01’ and ‘2015-10-20’
data3 = dbFetch(sqlData, n=-1)

connections <- dbListConnections(MySQL())
for(i in connections) {dbDisconnect(i)}

次のコードを使用して、データを並行して取得しようとしました。

library(foreach)
library(doParallel)
library(RMySQL)

fetchData<- function(nInst, inst1, inst2, inst3, inst4, inst5, startDate, endDate, con1){

  inst<-NULL
  sqlData <-NULL

  if(nInst==1)
    inst<-inst1
  else if(nInst==2)
    inst<-inst2
  else if(nInst==3)
    inst<-inst3
  else if(nInst==4)
    inst<-inst4
  else if(nInst==5)
    inst<-inst5

  sqlData <- dbSendQuery(con1, paste0('select TradeTime, Open, High, Low, Close from ', inst, ' where tradetime between \'',  startDate, '\' and \'',  endDate, '\'' ))
  data1 = dbFetch(sqlData, n=-1)
  print(head(data1))

  data1 
}

cluster = makeCluster(5, type = "SOCK")
registerDoParallel(cluster)
mydb <- NULL
clusterEvalQ(cluster, {

  mydb <- dbConnect(MySQL(), user='external', password='xxxxxx', dbname='GMT_Minute_Data', host='xx.xx.xxx.xxx')
  NULL
})


allDataList<-foreach(n =1:2, .verbose=TRUE, .packages=('RMySQL')) %dopar% {
  fetchData(n, inst1, inst2, inst3, inst4, inst5, startDate, endDate, mydb)

}
stopCluster(cluster)
on.exit(dbDisconnect(mydb))

コードが最初の計測器のデータのみを取得し、残りの計測器のデータを取得しない場合があります。

誰かが解決策を知っている場合は助けてください。

ありがとう、

4

1 に答える 1

0

mydb問題は、 foreach が変数をワーカーに自動エクスポートしているため、初期化の目的が無効になっていることだと思いますclusterEvalQ。データベース接続をシリアライズして他のマシンに適切に送信することはできないため、 を使用して手動で初期化すると便利ですclusterEvalQ。foreachオプションを使用すると、自動エクスポートされていない.verbose=TRUEことを確認できます。自動エクスポートされていると表示されている場合は、それを防ぐ必要があります。mydb

あなたの例では、ステートメントをmydb削除するだけで自動エクスポートを防ぐことができmydb <- NULLますが、 foreach.noexport='mydb'オプションを使用して、自動エクスポートされないようにすることをお勧めします。これを行う簡単な例を次に示します。

library(doParallel)

fetchData <- function(ignore) {
  mydb
}

cluster <- makeCluster(5, type = "SOCK")
registerDoParallel(cluster)

clusterEvalQ(cluster, {
  mydb <- sample(100, 1) # different value for each worker
  NULL
})

r <- foreach(n=1:2, .noexport='mydb', .verbose=TRUE) %dopar% { 
  fetchData(n)
}

この場合、 foreach はfetchData関数を分析し、 という名前の変数を使用していることに気付きますmydb。したがって、mydbマスターで が定義されている場合、そうしないように指示しない限り、自動エクスポートされます。.noexport='mydb'そのため、ローカル環境で定義されていなくても使用することをお勧めします。関数が破損したデータベース接続を使用しないことを二重に確認します。

于 2016-04-22T13:32:49.400 に答える