2

Node.js で csv を処理し、Oracle データベースに保存するためのコードに取り組んでいます。これまでのところ、うまくいっていますが、csv に大量の行があると、「ORA-01000: 開いているカーソルの最大数を超えました」というメッセージが表示されます。スクリプトの冒頭で一度 Oracle に接続しています。csv の各レコードについて、複数SELECTの 、INSERT、およびDELETEを実行してから、次のエントリに移動して、同じ接続を使用してすべてを処理しています。最後に、接続を閉じます。私が考えていたのは、プールから毎回新しい接続を取得することでしたが、1 つの接続を使用する必要があるという他の投稿を読みました。これらすべてのクエリを 1 つの接続で処理するには、特別な設定を行う必要があるのではないでしょうか?

台本がちょっと長いので、大事なところだけ載せます… 必要ならもっと載せます。Qcsvtojson、およびを使用しoracledbます。

...

var conn = null;

function connect() {
    var deferred = Q.defer();

    oracledb.outFormat = oracledb.OBJECT;

    oracledb.getConnection(
        {
            user                : 'foo',
            password            : 'bar',
            connectString       : 'foo.bar/bar',
        }, 
        function(err, c) {
            if(err) deferred.reject(new Error(err));

            // set global connection
            conn = c;
            deferred.resolve();
        }
    );

    return deferred.promise;
}

function closeConnection(conn) {
    var deferred = Q.defer();

    conn.release(function(err){
        if(err) deferred.reject(err);
        else return deferred.resolve();
    });

    return deferred.promise;
}

/* Process All Data, Promise Loop */
function process(data) {
    return processEntry(data.shift()).then(function(){

            console.log('Finished processing entry.');

            return data.length > 0 ? process(data) : true;
    });
}

/* Process an Entry */
function processEntry(entry) {
    var deferred = Q.defer();

    var data = {};

    entryExists(entry)
        .then(function(result) {
            if(result) return entryLogicError(null, 'Entry exists. Skipping.');
            else return getUserFromReleaseCode(entry);
        })
        .then(function(result) {
            if(typeof result != 'undefined' && result.length > 0) {
                data.user = result[0];
                return getPanelCode(entry);
            }
            else return entryLogicError(entry, 'No valid release code.');
       })
       .then(function(result){
           if(typeof result != 'undefined' && result.length > 0) {
               return createHeader(result[0].foo, result[0].bar);
           }
           else return entryLogicError(entry, 'No valid panel code.');
       })   

     ... More of the same kind of statements processing the entry ...

       .then(function() {
           return logEntry(entry);
       })
       .catch(function(error) { console.log("DATA ERROR: " + error) })
       .done(function(){
           deferred.resolve();
       });


    return deferred.promise;
}

function entryLogicError() {
    // logs entry to be corrected, return a rejected promise to go to the next entry
}

/* Check if record has been processed */
function entryExists(entry) {
    var deferred = Q.defer();

    var foo = entry[ENTRY_CONST.FOO];
    var bar = entry[ENTRY_CONST.BAR];

    conn.execute(
        'SELECT * FROM TBL_FOO ' +
        'WHERE FOO = :foo AND ' +
        'BAR = :bar',
        [foo, bar],
        function(err, result) {
            if(err) deferred.reject(err);
            else {
                deferred.resolve(result.rows.length > 0);
            }
        });

    return deferred.promise;
}

/* Get User from Release Code */
function getUserFromReleaseCode(entry) {
    var deferred = Q.defer();

    var foo = entry[ENTRY_CONST.FOO];

    conn.execute(
        'SELECT * FROM TBL_BAR ' +
        'WHERE FOO = :foo',
        [foo],
        function(err, result) {
            if(err) deferred.reject(err);
            else {
                deferred.resolve(result.rows);
            }
        });

    return deferred.promise;
}


  /* Create Header */
function createHeader(foo, bar) {

    var deferred = Q.defer();
    conn.execute(
        'BEGIN INSERT INTO TBL_DR_FOO VALUES (NULL,:foo, :bar,' +
        '1,NULL,1,NULL,NULL,NULL,NULL) RETURNING DR_FOO_ID INTO :DR_FOO_ID; COMMIT; END;',
        {   foo: foo,
            bar: bar,
            DR_FOO_ID: { dir: oracledb.BIND_OUT, type: oracledb.NUMBER }
        },
        function(err, result) {
            if(err) deferred.reject(err);
            else deferred.resolve(result.outBinds);
        });

    return deferred.promise;
}

function cleanHistory() {
    // statement that deletes records from a certain date using conn.execute(..)
}

/* Main */
connect().then(function(){
    var converter = new Converter({ noheader: false });

    converter.on('end_parsed', function(data) {

        process(data).then(function(){ 
            return cleanHistory();
        })
        .then(function(){
            return closeConnection();
        }).done();

    });

    fs.createReadStream(batch).pipe(converter);


}, function(err){
    return console.error(err);
});
4

0 に答える 0