1

私はNodeとExpress.jsに比較的慣れていません。ファイル自体に保存されている不規則な間隔で CSV データを行ごとにプッシュする websocket サーバーを作成しようとしています。CSV 構造は次のようなものです: [timeout [ms], data1, data2, data3 ...]

クライアントと通信する websocket サーバーの作成に成功しました。

このようなことを効果的に行うための最良の解決策を探しています: 1. CSV ファイルの行を読み取る 2. WebSockets で行を送信する 3. 行 4 の最初の値に格納されている期間、読み取りを一時停止します. 間隔が経過したら読み取りを再開し、手順 1 に戻ります。

これまでのところ、私はここまで来ました (非常に間違っている可能性があるため、コードを完全に破棄してください。言ったように、私はそれに慣れていません。pause() は何もしないようです。

var $    = require('jquery')
,csv = require('csv');

exports.index = function(server){
  var io   = require('socket.io').listen(server);

  io.sockets.on('connection', function (socket) {

  socket.on('startTransmission', function(msg) {
    csv()
    .from.path('C:/dev/node_express/csv/test.csv', { delimiter: ',', escape: '"' })
    .on('record', function(row,index){
      var rowArray = $.parseJSON(JSON.stringify(row));
      var json = {},
          that = this;
        $.each(rowArray, function(i,value){
          json[keys[i]] = value;
        });
        socket.emit('transmitDataData', json);
        //this.pause(); //I guess around here is where I'd like to pause 
        // setTimeout(function(){
        //   that.resume();  //and resume here after the timeout, stored in the first value (rowArray[0])    
        // }, rowArray[0]);

    });
});
});
};

残念ながら、コメントアウトされたコードは機能しません - すべてのデータが行ごとにすぐに送信され、関数は一時停止しません

4

1 に答える 1

0

私は別のユースケースで同じようなことに遭遇しました。問題は、ストリームで pause() を呼び出すと、基になるストリームの読み取りが一時停止されますが、csv レコードの解析は一時停止されないためrecord、最後の読み取りストリーム チャンクを構成する残りのレコードでイベントが呼び出される可能性があることです。私の場合、次のように同期しました。

var rows=0, actions=0;

stream.on('record', function(row, index){                                                                 

    rows++;                                

    // pause here, but expect more record events until the raw read stream is exhausted
    stream.pause();

    runner.do(row, function(err, result) {                                                 

        // when actions have caught up to rows read, read more rows.
        if (actions==rows) {
            stream.resume();
        }                    
    });
});

あなたの場合、行をバッファリングし、タイマーで解放します。これは、私が何を意味するのかを理解するためだけに、テストされていないリファクタリングです。

var $ = require('jquery'),
    csv = require('csv');

exports.index = function(server){

  var io = require('socket.io').listen(server);
  io.sockets.on('connection', function (socket) {

      socket.on('startTransmission', function(msg) {

        var timer=null, buffered=[], stream=csv().from.path('C:/dev/node_express/csv/test.csv', { delimiter: ',', escape: '"' });

        function transmit(row) {        
            socket.emit('transmitDataData', row);                                     
        }       

        function drain(timeout) {                                                    
            if (!timer) {
                timer = setTimeout(function() {                                    
                    timer = null;
                    if (buffered.length<=1) { // get more rows ahead of time so we don't run out. otherwise, we could skip a beat.
                        stream.resume(); // get more rows
                    } else {                        
                        var row = buffered.shift();
                        transmit(row);
                        drain(row[0]);                        
                    }

                }, timeout);               
            }                
        }

        stream.on('record', function(row,index){                        
            stream.pause();                                                                                   
            if (index == 0) {                            
                transmit(row);                                               
            } else {                            
                buffered.push(row);                                   
            }                                                       
            drain(row[0]); // assuming row[0] contains a timeout value.                                                                  
        });

        stream.on('end', function() {
            // no more rows. wait for buffer to empty, then cleanup.
        });

        stream.on('error', function() {
            // handle error.
        });

    });
};
于 2013-08-23T03:28:26.273 に答える