7

現時点では、長時間実行されるタスク(ジョブ)を処理するRESTfulAPIを使用してWebサービスを構築しようとしています。

アイデアは、ユーザーがPOSTを実行してジョブを送信することです。このPOSTは、結果のURLも含むジョブステータスをチェックするためのURLを返します。ジョブが完了すると(つまり、何らかの値がデータベースに書き込まれると)、結果のURLは(結果がないのではなく)適切な情報を返し、ジョブのURLは完了したステータスを示します。

残念ながら、計算は非常に集中的であるため、一度に実行できるのは1つだけであるため、ジョブをキューに入れる必要があります。

疑似的にはこのようなものが必要になります

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {}))

(defn schedule-job [params] 
  ;; schedules the job into the queue and 
  ;; adds the job to a jobs map for checking status via GET
  ;; note that the job should not  be evaluated until popped from the queue
)

(POST "/analyze" [{params :params}] 
 (schedulde-job params))

(GET "job/:id" [:d] 
 (get @jobs id))

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete
;; Note: should not terminate when queue is empty! 

非同期処理を可能にするLaminaを調べましたが、ニーズに合わなかったようです。

私の質問は、キューが空になったときに終了せずに、つまり着信ジョブを永続的に処理することなく、ジョブキューをデキューして、前のタスクが終了した後にそのタスクを実行する方法です。

4

3 に答える 3

9

java.util.concurrent.ExecutorServiceが必要な場合があります。これにより、後で実行するためにジョブを送信し、クエリして完了したかどうかを検出できるFutureを返すことができます。

(import '[java.util.concurrent Callable Executors])

(def job-executor
  (Executors/newSingleThreadExecutor))

(def jobs (atom {}))

(defn submit-job [func]
  (let [job-id   (str (java.util.UUID/randomUUID))
        callable (reify Callable (call [_] (func))]
    (swap! jobs assoc job-id (.submit job-executor callable))
    job-id))

(use 'compojure.core)

(defroutes app
  (POST "/jobs" [& params]
    (let [id (submit-job #(analyze params))]
      {:status 201 :headers {"Location" (str "/jobs/" id)}}))
  (GET "/jobs/:id" [id]
    (let [job-future (@jobs id)]
      (if (.isDone job-future)
        (.get job-future)
        {:status 404}))))
于 2013-02-03T20:12:11.397 に答える
2

これは私が期待したことをしているように見えますが、それはかなり非慣用的なようです。誰かがこれを改善する方法について考えていますか?

;; Create a unique identifier
(defn uuid [] (str (java.util.UUID/randomUUID)))

;; Create a job-queue and a map for keeping track of the status
(def job-queue (ref clojure.lang.PersistentQueue/EMPTY))
(def jobs (atom {}))

(defn dequeue! [queue-ref]
  ;; Pops the first element off the queue-ref
  (dosync 
    (let [item (peek @queue-ref)]
      (alter queue-ref pop)
      item)))

(defn schedule-job! [task] 
  ;; Schedule a task to be executed, expects a function (task) to be evaluated
  (let [uuid (uuid)
        job (delay task)]
    (dosync 
      (swap! jobs assoc uuid job) 
      (alter job-queue conj job))))

(defn run-jobs []
  ;; Runs the jobs 
  (while true
    (Thread/sleep 10)
    (let [curr (dequeue! job-queue)] 
      (if-not (nil? curr) (@curr)))))

(.start (Thread. run-jobs))
于 2013-02-03T16:52:34.823 に答える
0

あなたの説明は、複数のプロデューサーと単一のコンシューマーのシナリオのようです。以下はサンプルコードです(エージェントが死なないように、RESTのものと、場合によってはいくつかの例外処理に接続できます)

(def worker (agent {}))                                                                                                                              

(defn do-task [name func]                                                                                                                            
  (send worker                                                                                                                                       
        (fn [results]                                                                                                                                 
          (let [r (func)]                                                                                                                            
            (assoc results name r)))))

;submit tasks                                                                                                               
(do-task "uuid1" #(print 10))                                                                                                                        
(do-task "uuid2" #(+ 1 1))

;get all results
(print @worker) 
于 2013-02-03T17:26:44.023 に答える