15

オンラインで検索したところ、名前付きパイプを使用するための簡単な「チュートリアル」が見つかりました。ただし、バックグラウンドジョブで何かを行うと、多くのデータが失われるようです。

[[編集:はるかに簡単な解決策を見つけました。投稿への返信を参照してください。ですから、私が提起した質問は今や学術的なものです-ジョブサーバーが必要な場合に備えて]]

Linux2.6.32でのUbuntu10.04の使用-25-generic#45-Ubuntu SMP Sat Oct 16 19:52:42 UTC 2010 x86_64 GNU / Linux

GNU bash、バージョン4.1.5(1)-リリース(x86_64-pc-linux-gnu)。

私のbash関数は次のとおりです。

function jqs
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read txt <"$pipe"
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      fi
    fi
  done
}

私はこれをバックグラウンドで実行します:

> jqs&
[1] 5336

そして今、私はそれを養います:

for i in 1 2 3 4 5 6 7 8
do
  (echo aaa$i > /tmp/__job_control_manager__ && echo success$i &)
done

出力に一貫性がありません。私は頻繁にすべての成功のエコーを取得しません。成功エコーと同じ数の新しいテキストエコーを取得しますが、それより少ない場合もあります。

「フィード」から「&」を削除すると、機能しているように見えますが、出力が読み取られるまでブロックされます。したがって、サブプロセスをブロックしたいのですが、メインプロセスはブロックしません。

目的は、単純なジョブ制御スクリプトを記述して、最大で10個のジョブを並行して実行し、残りを後で処理するためにキューに入れることができるようにすることですが、それらが実行されることを確実に認識します。

以下の完全なジョブマネージャー:

function jq_manage
{
  export __gn__="$1"

  pipe=/tmp/__job_control_manager_"$__gn__"__
  trap "rm -f $pipe"    EXIT
  trap "break"      SIGKILL

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    date
    jobs
    if (($(jobs | egrep "Running.*echo '%#_Group_#%_$__gn__'" | wc -l) < $__jN__))
    then
      echo "Waiting for new job"
      if read new_job <"$pipe"
      then
    echo "new job is [[$new_job]]"

    if [[ "$new_job" == 'quit' ]]
    then
      break
    fi

    echo "In group $__gn__, starting job $new_job"
    eval "(echo '%#_Group_#%_$__gn__' > /dev/null; $new_job) &"
      fi
    else
      sleep 3
    fi
  done
}

function jq
{
  # __gn__ = first parameter to this function, the job group name (the pool within which to allocate __jN__ jobs)
  # __jN__ = second parameter to this function, the maximum of job numbers to run concurrently

  export __gn__="$1"
  shift
  export __jN__="$1"
  shift

  export __jq__=$(jobs | egrep "Running.*echo '%#_GroupQueue_#%_$__gn__'" | wc -l)
  if (($__jq__ '<' 1))
  then
    eval "(echo '%#_GroupQueue_#%_$__gn__' > /dev/null; jq_manage $__gn__) &"
  fi

  pipe=/tmp/__job_control_manager_"$__gn__"__

  echo $@ >$pipe
}

呼び出し

jq <name> <max processes> <command>
jq abc 2 sleep 20

1つのプロセスを開始します。その部分は正常に機能します。2つ目を開始します。手作業で一つずつうまくいくようです。しかし、上記のより単純な例のように、ループで10を開始すると、システムが失われるようです。

このIPCデータの明らかな損失を解決するために私ができることについてのヒントをいただければ幸いです。

よろしく、アラン。

4

6 に答える 6

27

あなたの問題は以下のifステートメントです:

while true
do
    if read txt <"$pipe"
    ....
done

何が起こっているのかというと、ジョブキューサーバーはループの周りで毎回パイプを開いたり閉じたりしています。これは、一部のクライアントがパイプに書き込もうとしたときに「パイプの破損」エラーが発生していることを意味します。つまり、ライターがパイプを開いた後、パイプのリーダーが消えます。

これを修正するには、サーバーのループを変更して、ループ全体でパイプを1回開きます。

while true
do
    if read txt
    ....
done < "$pipe"

このようにすると、パイプが一度開かれ、開いたままになります。

ループ内のすべての処理では名前付きパイプにstdinがアタッチされるため、ループ内で実行する内容に注意する必要があります。ループ内のすべてのプロセスのstdinを別の場所からリダイレクトするようにしてください。そうしないと、パイプからのデータを消費する可能性があります。

編集:最後のクライアントがパイプを閉じたときに読み取りでEOFが発生するという問題があるため、ファイル記述子を複製するjillesメソッドを使用するか、自分もクライアントであることを確認して書き込み側を維持することができます開いているパイプの:

while true
do
    if read txt
    ....
done < "$pipe" 3> "$pipe"

これにより、パイプの書き込み側がfd 3で開いたままになります。このファイル記述子には、stdinの場合と同じ警告が適用されます。子プロセスがそれを継承しないように、それを閉じる必要があります。おそらくstdinよりも重要ではありませんが、よりクリーンになります。

于 2010-11-27T12:07:49.973 に答える
8

他の回答で述べたように、データが失われないように、FIFOを常に開いたままにしておく必要があります。

ただし、FIFOが開いた後にすべてのライターが終了すると(ライターが存在したため)、読み取りはすぐに戻ります(そしてpoll()戻りますPOLLHUP)。この状態をクリアする唯一の方法は、FIFOを再度開くことです。

POSIXはこれに対する解決策を提供しませんが、少なくともLinuxとFreeBSDは解決策を提供します。読み取りが失敗し始めた場合は、元の記述子を開いたままfifoを再度開きます。LinuxとFreeBSDでは「ハングアップ」状態は特定の開いているファイル記述に対してローカルであるのに対し、POSIXではfifoに対してグローバルであるため、これは機能します。

これは、次のようなシェルスクリプトで実行できます。

while :; do
    exec 3<tmp/testfifo
    exec 4<&-
    while read x; do
        echo "input: $x"
    done <&3
    exec 4<&3
    exec 3<&-
done
于 2010-11-27T21:17:45.707 に答える
2

興味があるかもしれない人のために、[[再編集]] camhとjilesによるコメントに続いて、ここにテストサーバースクリプトの2つの新しいバージョンがあります。

どちらのバージョンも、期待どおりに機能するようになりました。

パイプ管理用のcamhのバージョン:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    fi
  done 3< "$pipe" 4> "$pipe"    # 4 is just to keep the pipe opened so any real client does not end up causing read to return EOF
}

パイプ管理用のジルのバージョン:

function jqs    # Job queue manager
{
  pipe=/tmp/__job_control_manager__
  trap "rm -f $pipe; exit"  EXIT TERM

  if [[ ! -p "$pipe" ]]; then
      mkfifo "$pipe"
  fi

  exec 3< "$pipe"
  exec 4<&-

  while true
  do
    if read -u 3 txt
    then
      echo "$(date +'%Y'): new text is [[$txt]]"

      if [[ "$txt" == 'quit' ]]
      then
    break
      else
        sleep 1
        # process $txt - remember that if this is to be a spawned job, we should close fd 3 and 4 beforehand
      fi
    else
      # Close the pipe and reconnect it so that the next read does not end up returning EOF
      exec 4<&3
      exec 3<&-
      exec 3< "$pipe"
      exec 4<&-
    fi
  done
}

皆様のご協力に感謝いたします。

于 2010-11-27T22:39:11.180 に答える
1

camh&Dennis Williamsonのように、パイプを壊さないでください。

今、私はコマンドラインで直接、より小さな例を持っています:

サーバ:

(
  for i in {0,1,2,3,4}{0,1,2,3,4,5,6,7,8,9};
  do
    if read s;
      then echo ">>$i--$s//";
    else
      echo "<<$i";
    fi;
  done < tst-fifo
)&

クライアント:

(
  for i in {%a,#b}{1,2}{0,1};
  do
    echo "Test-$i" > tst-fifo;
  done
)&

キーラインを次のように置き換えることができます。

    (echo "Test-$i" > tst-fifo&);

パイプに送信されたすべてのクライアントデータが読み取られますが、クライアントのオプション2では、すべてのデータが読み取られる前にサーバーを数回起動する必要がある場合があります。

ただし、読み取りはパイプ内のデータが開始するのを待ちますが、データがプッシュされると、空の文字列を永久に読み取ります。

これを止める方法はありますか?

洞察を再度ありがとう。

于 2010-11-27T17:38:05.833 に答える
0

一方では、問題は私が思っていたよりもひどいです。私のより複雑な例(jq_manage)では、同じデータがパイプから何度も読み取られている(新しいデータが書き込まれていない場合でも)場合があるようです。それに)。

一方、私は簡単な解決策を見つけました(デニスのコメントに従って編集):

function jqn    # compute the number of jobs running in that group
{
  __jqty__=$(jobs | egrep "Running.*echo '%#_Group_#%_$__groupn__'" | wc -l)
}

function jq
{
  __groupn__="$1";  shift   # job group name (the pool within which to allocate $__jmax__ jobs)
  __jmax__="$1";    shift   # maximum of job numbers to run concurrently

  jqn
  while (($__jqty__ '>=' $__jmax__))
  do
    sleep 1
    jqn
  done

  eval "(echo '%#_Group_#%_$__groupn__' > /dev/null; $@) &"
}

チャームのように機能します。ソケットやパイプは必要ありません。単純。

于 2010-11-27T08:37:31.493 に答える
0

最大で10個のジョブを並行して実行し、残りを後で処理するためにキューに入れますが、それらが実行されることを確実に認識します

これはGNUParallelで実行できます。このスクリプトは必要ありません。

http://www.gnu.org/software/parallel/man.html#options

max-procs「ジョブスロット数。最大N個のジョブを並行して実行する」を設定できます。使用するCPUコアの数を設定するオプションがあります。実行されたジョブのリストをログファイルに保存できますが、これはベータ機能です。

于 2013-05-02T16:38:59.833 に答える