Node.js で csv を処理し、Oracle データベースに保存するためのコードに取り組んでいます。これまでのところ、うまくいっていますが、csv に大量の行があると、「ORA-01000: 開いているカーソルの最大数を超えました」というメッセージが表示されます。スクリプトの冒頭で一度 Oracle に接続しています。csv の各レコードについて、複数SELECT
の 、INSERT
、およびDELETE
を実行してから、次のエントリに移動して、同じ接続を使用してすべてを処理しています。最後に、接続を閉じます。私が考えていたのは、プールから毎回新しい接続を取得することでしたが、1 つの接続を使用する必要があるという他の投稿を読みました。これらすべてのクエリを 1 つの接続で処理するには、特別な設定を行う必要があるのではないでしょうか?
台本がちょっと長いので、大事なところだけ載せます… 必要ならもっと載せます。Q
、csvtojson
、およびを使用しoracledb
ます。
...
var conn = null;
function connect() {
var deferred = Q.defer();
oracledb.outFormat = oracledb.OBJECT;
oracledb.getConnection(
{
user : 'foo',
password : 'bar',
connectString : 'foo.bar/bar',
},
function(err, c) {
if(err) deferred.reject(new Error(err));
// set global connection
conn = c;
deferred.resolve();
}
);
return deferred.promise;
}
function closeConnection(conn) {
var deferred = Q.defer();
conn.release(function(err){
if(err) deferred.reject(err);
else return deferred.resolve();
});
return deferred.promise;
}
/* Process All Data, Promise Loop */
function process(data) {
return processEntry(data.shift()).then(function(){
console.log('Finished processing entry.');
return data.length > 0 ? process(data) : true;
});
}
/* Process an Entry */
function processEntry(entry) {
var deferred = Q.defer();
var data = {};
entryExists(entry)
.then(function(result) {
if(result) return entryLogicError(null, 'Entry exists. Skipping.');
else return getUserFromReleaseCode(entry);
})
.then(function(result) {
if(typeof result != 'undefined' && result.length > 0) {
data.user = result[0];
return getPanelCode(entry);
}
else return entryLogicError(entry, 'No valid release code.');
})
.then(function(result){
if(typeof result != 'undefined' && result.length > 0) {
return createHeader(result[0].foo, result[0].bar);
}
else return entryLogicError(entry, 'No valid panel code.');
})
... More of the same kind of statements processing the entry ...
.then(function() {
return logEntry(entry);
})
.catch(function(error) { console.log("DATA ERROR: " + error) })
.done(function(){
deferred.resolve();
});
return deferred.promise;
}
function entryLogicError() {
// logs entry to be corrected, return a rejected promise to go to the next entry
}
/* Check if record has been processed */
function entryExists(entry) {
var deferred = Q.defer();
var foo = entry[ENTRY_CONST.FOO];
var bar = entry[ENTRY_CONST.BAR];
conn.execute(
'SELECT * FROM TBL_FOO ' +
'WHERE FOO = :foo AND ' +
'BAR = :bar',
[foo, bar],
function(err, result) {
if(err) deferred.reject(err);
else {
deferred.resolve(result.rows.length > 0);
}
});
return deferred.promise;
}
/* Get User from Release Code */
function getUserFromReleaseCode(entry) {
var deferred = Q.defer();
var foo = entry[ENTRY_CONST.FOO];
conn.execute(
'SELECT * FROM TBL_BAR ' +
'WHERE FOO = :foo',
[foo],
function(err, result) {
if(err) deferred.reject(err);
else {
deferred.resolve(result.rows);
}
});
return deferred.promise;
}
/* Create Header */
function createHeader(foo, bar) {
var deferred = Q.defer();
conn.execute(
'BEGIN INSERT INTO TBL_DR_FOO VALUES (NULL,:foo, :bar,' +
'1,NULL,1,NULL,NULL,NULL,NULL) RETURNING DR_FOO_ID INTO :DR_FOO_ID; COMMIT; END;',
{ foo: foo,
bar: bar,
DR_FOO_ID: { dir: oracledb.BIND_OUT, type: oracledb.NUMBER }
},
function(err, result) {
if(err) deferred.reject(err);
else deferred.resolve(result.outBinds);
});
return deferred.promise;
}
function cleanHistory() {
// statement that deletes records from a certain date using conn.execute(..)
}
/* Main */
connect().then(function(){
var converter = new Converter({ noheader: false });
converter.on('end_parsed', function(data) {
process(data).then(function(){
return cleanHistory();
})
.then(function(){
return closeConnection();
}).done();
});
fs.createReadStream(batch).pipe(converter);
}, function(err){
return console.error(err);
});