7

core.async walk through exampleに基づいて、以下の同様のコードを作成し、10 秒のタイムアウトで複数のチャネルを使用して CPU を集中的に使用するジョブを処理しました。ただし、メインスレッドが戻った後、CPU 使用率は約 700% のままです (8 CPU マシン)。Java プロセスをシャットダウンするには、emacs で手動で nrepl-close を実行する必要があります。

(go..) block によって生成されたマクロスレッドを強制終了する適切な方法はありますか? 近づいてみた!それぞれのちゃんですが、うまくいきません。メインスレッドが戻った後、JavaプロセスによってCPU使用率が0に戻ることを確認したい。

(defn [] RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB (do...   (str ...)))


(let [n 1000
      cs (repeatedly n chan)]
  (doseq [c cs] 
    (go 
     (>! c  (RETURNED-STR-FROM-SOME-CPU-INTENSE-JOB ))))

  (dotimes [i n]
    (let [[result source] (alts!!  (conj cs (timeout 10000))) ]  ;;wait for 10 seconds for each job
      (if  (list-contains? cs source)  ;;if returned chan belongs to cs 
        (prn "OK JOB FINISHED " result)
        (prn "JOB TIMEOUT")
        )))

 (doseq [i cs]
   (close! i))  ;;not useful for "killing" macro thread

 (prn "JOBS ARE DONE"))

;;Btw list-contains? function is used to judge whether an element is in a list
;;http://stackoverflow.com/questions/3249334/test-whether-a-list-contains-a-specific-value-in-clojure
(defn list-contains? [coll value]
  (let [s (seq coll)]
    (if s
      (if (= (first s) value) true (recur (rest s) value))
      false)))
4

3 に答える 3

2

REPL にはまだクリーンな方法がないようです。

非推奨のメソッド Thread.stop を使用して、非常に汚い方法を最初に試しました

 (doseq [i @threadpool ]
              (.stop i))

メインスレッドが REPL に戻ると CPU 使用率が低下したため動作しているように見えましたが、REPL でプログラムを再度実行すると、go ブロック部分でハングするだけです!!

それから私はググって、このブログを見つけました、そしてそれは言います

最後に、go ルーチンをシャットダウンする作業を明示的に行っていません。メイン関数が終了すると、Go ルーチンは自動的に動作を停止します。したがって、go ルーチンは JVM のデーモン スレッドのようなものです (「スレッド」部分を除いて ...)

そこで、プロジェクトを uberjar にしてコマンド コンソールで実行して再試行したところ、点滅カーソルがコンソールに戻るとすぐに CPU 使用率が低下することがわかりました。

于 2013-09-10T02:01:01.160 に答える
1

別の関連する質問How to control number of threads in (go...) の回答に基づいて、 (go...) ブロックによって開始されたすべてのスレッドを適切に強制終了するより良い方法を見つけました。

最初にexecutor varを変更し、カスタムスレッドプールを提供します

;; def, not defonce, so that the executor can be re-defined
;; Number of threads are fixed to be 4
(def my-executor
  (java.util.concurrent.Executors/newFixedThreadPool
   4
   (conc/counted-thread-factory "my-async-dispatch-%d" true)))

(alter-var-root #'clojure.core.async.impl.dispatch/executor
                (constantly (delay (tp/thread-pool-executor my-executor))))

次に、(go...) ブロックの最後で .shutdownNow および .awaitTermination メソッドを呼び出します

(.shutdownNow my-executor)
(while (not  (.awaitTermination  my-executor 10 java.util.concurrent.TimeUnit/SECONDS ) )
       (prn "...waiting 10 secs for executor pool to finish") )

[更新] 上記の shutdown executor メソッドは十分に純粋ではないようです。私の場合の最終的な解決策は、 thunk-timeout関数を使用して、独自のタイムアウトを制御する関数を go ブロックに送信することです。クレジットはこの投稿に送られます。以下の例

(defn toSendToGo [args timeoutUnits]
  (let [result (atom nil)  
        timeout? (atom false)]
    (try
      ( thunk-timeout
        (fn []  (reset! result  (myFunction args))) timeoutUnits)
      (catch  java.util.concurrent.TimeoutException e  (do  (prn "!Time out after " timeoutUnits " seconds!!") (reset! timeout? true))     ))

    (if @timeout?  (do sth))
    @result))


(let [c ( chan)]
  (go (>! c (toSendToGo args timeoutUnits))))
于 2013-09-25T04:42:37.407 に答える
1
(shutdown-agents)

実装固有の JVM:エージェントとチャネルの両方がグローバル スレッド プールを使用し、エージェントの終了関数は、VM で開いているすべてのスレッドを繰り返して閉じます。最初にチャンネルを空にしてください : このアクションは即時であり、元に戻すことはできません (特に REPL の場合)。

于 2016-04-07T19:00:08.673 に答える