288

現在、Python2.6multiprocessingモジュールを使用して複数のプロセスを生成するフレームワークに中央モジュールがあります。を使用するためmultiprocessing、モジュールレベルのマルチプロセッシング対応ログがありますLOG = multiprocessing.get_logger()。ドキュメントによるsys.stderr、このロガーにはプロセス共有ロックがあり、複数のプロセスが同時に書き込みを行うことで、(またはファイルハンドルが)文字化けしないようになっています。

私が今抱えている問題は、フレームワークの他のモジュールがマルチプロセッシングに対応していないことです。私の見方では、この中央モジュールへのすべての依存関係でマルチプロセッシング対応のロギングを使用する必要があります。これは、フレームワークのすべてのクライアントにとっては言うまでもなく、フレームワーク内では厄介です。私が考えていない代替案はありますか?

4

23 に答える 23

142

私は今、パイプを介して親プロセスにすべてをフィードする独自のログハンドラーを作成しました。10 分間しかテストしていませんが、かなりうまく機能しているようです。

(注:これはRotatingFileHandler、私自身の使用例である にハードコードされています。)


更新: @javier は、このアプローチを Pypi で利用可能なパッケージとして維持するようになりました - Pypiのmultiprocessing-loggingを参照してください。


更新:実装!

これは、同時実行を正しく処理するためにキューを使用し、エラーから正しく回復するようになりました。私は現在、これを本番環境で数か月使用しており、以下の現在のバージョンは問題なく動作します。

from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback

class MultiProcessingLog(logging.Handler):
    def __init__(self, name, mode, maxsize, rotate):
        logging.Handler.__init__(self)

        self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
        self.queue = multiprocessing.Queue(-1)

        t = threading.Thread(target=self.receive)
        t.daemon = True
        t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        while True:
            try:
                record = self.queue.get()
                self._handler.emit(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

    def send(self, s):
        self.queue.put_nowait(s)

    def _format_record(self, record):
        # ensure that exc_info and args
        # have been stringified.  Removes any chance of
        # unpickleable things inside and possibly reduces
        # message size sent over the pipe
        if record.args:
            record.msg = record.msg % record.args
            record.args = None
        if record.exc_info:
            dummy = self.format(record)
            record.exc_info = None

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        self._handler.close()
        logging.Handler.close(self)
于 2009-05-21T18:10:33.620 に答える
36

QueueHandlerは Python 3.2+ でネイティブであり、まさにこれを行います。以前のバージョンで簡単に複製できます。

Python ドキュメントには、2 つの完全な例があります。 複数のプロセスから単一のファイルへのログ記録

Python < 3.2 を使用している場合は、 https://gist.github.com/vsajip/591589QueueHandlerから独自のコードにコピーするか、代わりに logutilsをインポートします。

各プロセス (親プロセスを含む) はログを に置き、Queue次にlistenerスレッドまたはプロセス (それぞれに 1 つの例を示します) がそれらを取得し、それらすべてをファイルに書き込みます。破損や文字化けのリスクはありません。

于 2015-08-18T06:47:17.583 に答える
32

以下は、Google からここにアクセスする他のユーザー (私のような) のためのシンプルさに重点を置いた別のソリューションです。ロギングは簡単なはずです!3.2 以降のみ。

import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random


def f(i):
    time.sleep(random.uniform(.01, .05))
    logging.info('function called with {} in worker thread.'.format(i))
    time.sleep(random.uniform(.01, .05))
    return i


def worker_init(q):
    # all records from worker processes go to qh and then into q
    qh = QueueHandler(q)
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    logger.addHandler(qh)


def logger_init():
    q = multiprocessing.Queue()
    # this is the handler for all log records
    handler = logging.StreamHandler()
    handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))

    # ql gets records from the queue and sends them to the handler
    ql = QueueListener(q, handler)
    ql.start()

    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # add the handler to the logger so records from this process are handled
    logger.addHandler(handler)

    return ql, q


def main():
    q_listener, q = logger_init()

    logging.info('hello from main thread')
    pool = multiprocessing.Pool(4, worker_init, [q])
    for result in pool.map(f, range(10)):
        pass
    pool.close()
    pool.join()
    q_listener.stop()

if __name__ == '__main__':
    main()
于 2016-01-23T13:59:47.747 に答える
22

loggingさらに別の代替手段は、パッケージ内のさまざまな非ファイルベースのロギングハンドラーである可能性があります。

  • SocketHandler
  • DatagramHandler
  • SyslogHandler

(その他)

このようにして、安全に書き込み、結果を正しく処理できるロギングデーモンをどこかに簡単に配置できます。(たとえば、メッセージの選択を解除し、それを独自のローテーションファイルハンドラーに送信する単純なソケットサーバー。)

もあなたのSyslogHandlerためにこれを世話します。もちろん、syslogシステムのインスタンスではなく、独自のインスタンスを使用することもできます。

于 2009-03-13T11:19:29.113 に答える
14

ロギングとキュー スレッドを分離しておく他のバリアント。

"""sample code for logging in subprocesses using multiprocessing

* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
  process.
* As in the other implementations, a thread reads the queue and calls the
  handlers. Except in this implementation, the thread is defined outside of a
  handler, which makes the logger definitions simpler.
* Works with multiple handlers.  If the logger in the main process defines
  multiple handlers, they will all be fed records generated by the
  subprocesses loggers.

tested with Python 2.5 and 2.6 on Linux and Windows

"""

import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys

DEFAULT_LEVEL = logging.DEBUG

formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")

class SubProcessLogHandler(logging.Handler):
    """handler used by subprocesses

    It simply puts items on a Queue for the main process to log.

    """

    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue

    def emit(self, record):
        self.queue.put(record)

class LogQueueReader(threading.Thread):
    """thread to write subprocesses log records to main process log

    This thread reads the records written by subprocesses and writes them to
    the handlers defined in the main process's handlers.

    """

    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue
        self.daemon = True

    def run(self):
        """read from the queue and write to the log handlers

        The logging documentation says logging is thread safe, so there
        shouldn't be contention between normal logging (from the main
        process) and this thread.

        Note that we're using the name of the original logger.

        """
        # Thanks Mike for the error checking code.
        while True:
            try:
                record = self.queue.get()
                # get the logger for this record
                logger = logging.getLogger(record.name)
                logger.callHandlers(record)
            except (KeyboardInterrupt, SystemExit):
                raise
            except EOFError:
                break
            except:
                traceback.print_exc(file=sys.stderr)

class LoggingProcess(multiprocessing.Process):

    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue

    def _setupLogger(self):
        # create the logger to use.
        logger = logging.getLogger('test.subprocess')
        # The only handler desired is the SubProcessLogHandler.  If any others
        # exist, remove them. In this case, on Unix and Linux the StreamHandler
        # will be inherited.

        for handler in logger.handlers:
            # just a check for my sanity
            assert not isinstance(handler, SubProcessLogHandler)
            logger.removeHandler(handler)
        # add the handler
        handler = SubProcessLogHandler(self.queue)
        handler.setFormatter(formatter)
        logger.addHandler(handler)

        # On Windows, the level will not be inherited.  Also, we could just
        # set the level to log everything here and filter it in the main
        # process handlers.  For now, just set it from the global default.
        logger.setLevel(DEFAULT_LEVEL)
        self.logger = logger

    def run(self):
        self._setupLogger()
        logger = self.logger
        # and here goes the logging
        p = multiprocessing.current_process()
        logger.info('hello from process %s with pid %s' % (p.name, p.pid))


if __name__ == '__main__':
    # queue used by the subprocess loggers
    queue = multiprocessing.Queue()
    # Just a normal logger
    logger = logging.getLogger('test')
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    logger.addHandler(handler)
    logger.setLevel(DEFAULT_LEVEL)
    logger.info('hello from the main process')
    # This thread will read from the subprocesses and write to the main log's
    # handlers.
    log_queue_reader = LogQueueReader(queue)
    log_queue_reader.start()
    # create the processes.
    for i in range(10):
        p = LoggingProcess(queue)
        p.start()
    # The way I read the multiprocessing warning about Queue, joining a
    # process before it has finished feeding the Queue can cause a deadlock.
    # Also, Queue.empty() is not realiable, so just make sure all processes
    # are finished.
    # active_children joins subprocesses when they're finished.
    while multiprocessing.active_children():
        time.sleep(.1)
于 2010-07-15T07:41:56.800 に答える
8

マルチプロセス ロギングを多くのパブリッシャーと 1 つのサブスクライバー (リスナー) として表すことができるため、ZeroMQを使用して PUB-SUB メッセージングを実装することは実際にオプションです。

さらに、ZMQの Python バインディングである PyZMQ モジュールは、zmq.PUB ソケットを介してログ メッセージを公開するためのオブジェクトであるPUBHandlerを実装します。

Webには、PyZMQ と PUBHandler を使用して分散アプリケーションから集中ログを記録するためのソリューションがあり、複数の公開プロセスでローカルで作業するために簡単に採用できます。

formatters = {
    logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
    logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
    logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
    logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
    logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}

# This one will be used by publishing processes
class PUBLogger:
    def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
        self._logger = logging.getLogger(__name__)
        self._logger.setLevel(logging.DEBUG)
        self.ctx = zmq.Context()
        self.pub = self.ctx.socket(zmq.PUB)
        self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
        self._handler = PUBHandler(self.pub)
        self._handler.formatters = formatters
        self._logger.addHandler(self._handler)

    @property
    def logger(self):
        return self._logger

# This one will be used by listener process
class SUBLogger:
    def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
        self.output_dir = output_dir
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.DEBUG)

        self.ctx = zmq.Context()
        self._sub = self.ctx.socket(zmq.SUB)
        self._sub.bind('tcp://*:{1}'.format(ip, port))
        self._sub.setsockopt(zmq.SUBSCRIBE, "")

        handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
        handler.setLevel(logging.DEBUG)
        formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
        handler.setFormatter(formatter)
        self._logger.addHandler(handler)

  @property
  def sub(self):
      return self._sub

  @property
  def logger(self):
      return self._logger

#  And that's the way we actually run things:

# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
    sub_logger = SUBLogger(ip)
    while not event.is_set():
        try:
            topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
            log_msg = getattr(logging, topic.lower())
            log_msg(message)
        except zmq.ZMQError as zmq_error:
            if zmq_error.errno == zmq.EAGAIN:
                pass


# Publisher processes loggers should be initialized as follows:

class Publisher:
    def __init__(self, stop_event, proc_id):
        self.stop_event = stop_event
        self.proc_id = proc_id
        self._logger = pub_logger.PUBLogger('127.0.0.1').logger

     def run(self):
         self._logger.info("{0} - Sending message".format(proc_id))

def run_worker(event, proc_id):
    worker = Publisher(event, proc_id)
    worker.run()

# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
                                 args=('127.0.0.1'), stop_event,))
sub_logger_process.start()

#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
    processes.append(Process(target=run_worker,
                                 args=(stop_event, i,)))
for p in processes:
    p.start()
于 2016-10-19T09:11:43.140 に答える
6

私もzzzeekの答えが好きですが、文字化けを防ぐためにキューが必要であるというアンドレは正しいです。パイプには運が良かったのですが、多少予想される文字化けが見られました。それを実装することは、特に Windows で実行するために、思ったよりも難しいことが判明しました。グローバル変数などに関するいくつかの追加の制限があります ( How's Python Multiprocessing Implemented on Windows?を参照してください) 。

しかし、私はついにそれを機能させました。この例はおそらく完璧ではないので、コメントや提案を歓迎します。また、フォーマッターやルート ロガー以外の設定もサポートしていません。基本的に、キューを使用して各プール プロセスでロガーを再初期化し、ロガーに他の属性を設定する必要があります。

繰り返しになりますが、コードを改善する方法に関する提案は大歓迎です。確かに、まだすべての Python のトリックを知っているわけではありません :-)

import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue

class MultiProcessingLogHandler(logging.Handler):
    def __init__(self, handler, queue, child=False):
        logging.Handler.__init__(self)

        self._handler = handler
        self.queue = queue

        # we only want one of the loggers to be pulling from the queue.
        # If there is a way to do this without needing to be passed this
        # information, that would be great!
        if child == False:
            self.shutdown = False
            self.polltime = 1
            t = threading.Thread(target=self.receive)
            t.daemon = True
            t.start()

    def setFormatter(self, fmt):
        logging.Handler.setFormatter(self, fmt)
        self._handler.setFormatter(fmt)

    def receive(self):
        #print "receive on"
        while (self.shutdown == False) or (self.queue.empty() == False):
            # so we block for a short period of time so that we can
            # check for the shutdown cases.
            try:
                record = self.queue.get(True, self.polltime)
                self._handler.emit(record)
            except Queue.Empty, e:
                pass

    def send(self, s):
        # send just puts it in the queue for the server to retrieve
        self.queue.put(s)

    def _format_record(self, record):
        ei = record.exc_info
        if ei:
            dummy = self.format(record) # just to get traceback text into record.exc_text
            record.exc_info = None  # to avoid Unpickleable error

        return record

    def emit(self, record):
        try:
            s = self._format_record(record)
            self.send(s)
        except (KeyboardInterrupt, SystemExit):
            raise
        except:
            self.handleError(record)

    def close(self):
        time.sleep(self.polltime+1) # give some time for messages to enter the queue.
        self.shutdown = True
        time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown

    def __del__(self):
        self.close() # hopefully this aids in orderly shutdown when things are going poorly.

def f(x):
    # just a logging command...
    logging.critical('function number: ' + str(x))
    # to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
    time.sleep(x % 3)

def initPool(queue, level):
    """
    This causes the logging module to be initialized with the necessary info
    in pool threads to work correctly.
    """
    logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
    logging.getLogger('').setLevel(level)

if __name__ == '__main__':
    stream = StringIO.StringIO()
    logQueue = multiprocessing.Queue(100)
    handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
    logging.getLogger('').addHandler(handler)
    logging.getLogger('').setLevel(logging.DEBUG)

    logging.debug('starting main')

    # when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
    pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
    pool.map(f, range(0,50))
    pool.close()

    logging.debug('done')
    logging.shutdown()
    print "stream output is:"
    print stream.getvalue()
于 2009-06-17T21:15:15.223 に答える
3

私はzzzeekの答えが好きでした。複数のスレッド/プロセスが同じパイプエンドを使用してログメッセージを生成すると、文字化けするため、パイプをキューに置き換えるだけです。

于 2009-06-06T13:59:52.723 に答える
1

これが私の簡単なハック/回避策です...最も包括的ではありませんが、簡単に変更でき、読みやすく理解しやすいと思いますこれを書く前に見つけた他のどの答えよりも:

import logging
import multiprocessing

class FakeLogger(object):
    def __init__(self, q):
        self.q = q
    def info(self, item):
        self.q.put('INFO - {}'.format(item))
    def debug(self, item):
        self.q.put('DEBUG - {}'.format(item))
    def critical(self, item):
        self.q.put('CRITICAL - {}'.format(item))
    def warning(self, item):
        self.q.put('WARNING - {}'.format(item))

def some_other_func_that_gets_logger_and_logs(num):
    # notice the name get's discarded
    # of course you can easily add this to your FakeLogger class
    local_logger = logging.getLogger('local')
    local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
    local_logger.debug('hmm, something may need debugging here')
    return num*2

def func_to_parallelize(data_chunk):
    # unpack our args
    the_num, logger_q = data_chunk
    # since we're now in a new process, let's monkeypatch the logging module
    logging.getLogger = lambda name=None: FakeLogger(logger_q)
    # now do the actual work that happens to log stuff too
    new_num = some_other_func_that_gets_logger_and_logs(the_num)
    return (the_num, new_num)

if __name__ == '__main__':
    multiprocessing.freeze_support()
    m = multiprocessing.Manager()
    logger_q = m.Queue()
    # we have to pass our data to be parallel-processed
    # we also need to pass the Queue object so we can retrieve the logs
    parallelable_data = [(1, logger_q), (2, logger_q)]
    # set up a pool of processes so we can take advantage of multiple CPU cores
    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
    worker_output = pool.map(func_to_parallelize, parallelable_data)
    pool.close() # no more tasks
    pool.join()  # wrap up current tasks
    # get the contents of our FakeLogger object
    while not logger_q.empty():
        print logger_q.get()
    print 'worker output contained: {}'.format(worker_output)
于 2016-09-13T16:55:19.867 に答える
1

コードの一部で logging.exception を使用していることを除いて、アイアンハッカーのソリューションに似たソリューションがあり、トレースバックはピクルできないため、例外をキューに渡す前にフォーマットする必要があることがわかりました。

class QueueHandler(logging.Handler):
    def __init__(self, queue):
        logging.Handler.__init__(self)
        self.queue = queue
    def emit(self, record):
        if record.exc_info:
            # can't pass exc_info across processes so just format now
            record.exc_text = self.formatException(record.exc_info)
            record.exc_info = None
        self.queue.put(record)
    def formatException(self, ei):
        sio = cStringIO.StringIO()
        traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
        s = sio.getvalue()
        sio.close()
        if s[-1] == "\n":
            s = s[:-1]
        return s
于 2010-08-05T06:11:37.660 に答える
1

これが必要な人のために、現在のプロセス名をログに追加する multiprocessing_logging パッケージのデコレーターを作成したので、誰が何をログに記録したかが明確になります。

また、install_mp_handler() も実行するため、プールを作成する前に実行するのは役に立ちません。

これにより、どのワーカーがどのログ メッセージを作成するかを確認できます。

例を含む青写真は次のとおりです。

import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging

# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)


# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
    class MultiProcessLogFilter(logging.Filter):
        def filter(self, record):
            try:
                process_name = multiprocessing.current_process().name
            except BaseException:
                process_name = __name__
            record.msg = f'{process_name} :: {record.msg}'
            return True

    multiprocessing_logging.install_mp_handler()
    f = MultiProcessLogFilter()

    # Wraps is needed here so apply / apply_async know the function name
    @wraps(fn)
    def wrapper(*args, **kwargs):
        logger.removeFilter(f)
        logger.addFilter(f)
        return fn(*args, **kwargs)

    return wrapper


# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
    logger.info(f'test function called via: {argument}')


# You can also redefine undecored functions
def undecorated_function():
    logger.info('I am not decorated')


@logs_mp_process_names
def redecorated(*args, **kwargs):
    return undecorated_function(*args, **kwargs)


# Enjoy
if __name__ == '__main__':
    with multiprocessing.Pool() as mp_pool:
        # Also works with apply_async
        mp_pool.apply(test, ('mp pool',))
        mp_pool.apply(redecorated)
        logger.info('some main logs')
        test('main program')
于 2020-01-15T22:51:15.583 に答える
0

loggingモジュール内のロック、スレッド、およびフォークの組み合わせでデッドロックが発生している場合は、バグ レポート 6721で報告されています (関連する SO の質問も参照してください)。

ここに投稿された小さな修正ソリューションがあります。

ただし、これにより、潜在的なデッドロックが修正されるだけloggingです。物事が文字化けしている可能性があることは修正されません。ここに示されている他の回答を参照してください。

于 2015-03-26T12:04:40.203 に答える