1

この例に従って、ルビーでマルチスレッドループを試みています: http://taw.blogspot.com/2010/05/very-simple-parallelization-with-ruby.html

そのコードをコピーして、次のように書きました。

module Enumerable

    def ignore_exception
        begin
            yield
        rescue Exception => e
            STDERR.puts e.message
        end
    end
    def in_parallel(n)
        t_queue = Queue.new
        threads = (1..n).map {
            Thread.new{ 
                while x = t_queue.deq
                    ignore_exception{ yield(x[0]) }
                end
            }
        }
        each{|x| t_queue << [x]}
        n.times{ t_queue << nil }
        threads.each{|t| 
            t.join 
            unless t[:out].nil?
                puts t[:out]
            end
        }
    end
end

ids.in_parallel(10){ |id|
 conn = open_conn(loc)
 out = conn.getData(id)
 Thread.current[:out] = out
}

私が理解している方法は、一度に 10 個のアイテムをデキューし、id ごとにループでブロックを処理し、最後に 10 個のスレッドに参加し、終了するまで繰り返すことです。このコードを実行した後、特に ID のサイズが 10 未満の場合、異なる結果が得られることがあります。なぜこれが発生しているのか混乱しています。それらのIDの出力が存在することをサーバー側で確認できても、半分の時間は最大半分のIDに対して何も出力しません。たとえば、正しい出力が "Got id 1" と "Got id 2" の場合、{"Got id 1"} または {"Got id 2"} または {"Got id 1", "Got id 2"}。私の質問は、このコードの私の理解は正しいですか?

4

1 に答える 1

0

私のコードの問題は、open_conn()スレッドセーフではない関数呼び出しでした。接続ハンドルの取得を同期することで問題を修正しました。

connLock = Mutex.new
ids.in_parallel(10){ |id|
 conn = nil
 connLock.synchronize {
    conn = open_conn(loc)
 }
 out = conn.getData(id)
 Thread.current[:out] = out
}

また、次を使用 してループの並列化にhttp://peach.rubyforge.org/を使用する必要があります。

 ids.peach(10){ |id| ... }
于 2013-09-18T17:58:22.443 に答える