Python에서 멀티프로세싱을 사용하는 동안 어떻게 기록해야 합니까?
현재 저는 Python 2.6 모듈을 사용하여 여러 프로세스를 생성하는 중앙 모듈을 프레임워크에 가지고 있습니다.사용하기 때문에multiprocessing 레벨의 로그, 「Multiproute-level multiprocessing-aware log 」 ) 。LOG = multiprocessing.get_logger(). 문서에 따르면 이 로거(EDIT)에는 프로세스 공유 잠금이 없기 때문에 로거(EDIT)에 문제가 발생하지 않습니다.sys.stderr(또는 어떤 파일 처리든) 여러 프로세스가 동시에 파일에 쓰도록 함으로써 실현됩니다.
현재 문제는 프레임워크의 다른 모듈이 멀티프로세싱을 인식하지 못한다는 것입니다.이 중앙 모듈에 대한 모든 의존관계에서 멀티프로세싱 인식 로깅을 사용해야 한다고 생각합니다.프레임워크 내에서는 귀찮은 일이지만 프레임워크의 모든 클라이언트는 말할 것도 없습니다.내가 생각하고 있지 않은 대안이 있나요?
파이프를 통해 모든 것을 부모 프로세스에 공급하는 로그 핸들러를 방금 작성했습니다.테스트한지 10분밖에 안 됐는데 잘 되는 것 같아요.
(주의: 이 코드는RotatingFileHandler(본격적으로)
업데이트: @internals는 이 접근방식을 Pypi에서 사용할 수 있는 패키지로 유지하고 있습니다.https://github.com/jruere/multiprocessing-logging의 github, Pypi에서 multiprocessing-flashing을 참조하십시오.
업데이트: 구현!
이제 큐를 사용하여 올바른 동시 처리를 수행하고 오류로부터 올바르게 복구됩니다.몇 달째 사용하고 있습니다만, 아래의 버전은 문제없이 동작하고 있습니다.
from logging.handlers import RotatingFileHandler
import multiprocessing, threading, logging, sys, traceback
class MultiProcessingLog(logging.Handler):
def __init__(self, name, mode, maxsize, rotate):
logging.Handler.__init__(self)
self._handler = RotatingFileHandler(name, mode, maxsize, rotate)
self.queue = multiprocessing.Queue(-1)
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
while True:
try:
record = self.queue.get()
self._handler.emit(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def send(self, s):
self.queue.put_nowait(s)
def _format_record(self, record):
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.args:
record.msg = record.msg % record.args
record.args = None
if record.exc_info:
dummy = self.format(record)
record.exc_info = None
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
self._handler.close()
logging.Handler.close(self)
이 문제를 비침해적으로 해결하는 유일한 방법은 다음과 같습니다.
- 로그가 다른 파일 기술자(디스크 또는 파이프)로 전송되도록 각 워커 프로세스를 생성합니다.모든 로그 엔트리에 타임스탬프를 지정하는 것이 이상적입니다.
- 컨트롤러 프로세스는 다음 중 하나를 수행할 수 있습니다.
- 디스크 파일을 사용하는 경우: 실행 종료 시 타임스탬프별로 정렬된 로그 파일을 병합합니다.
- 파이프를 사용하는 경우(권장):모든 파이프에서 즉시 중앙 로그 파일로 로그 엔트리를 병합합니다(예를 들어 파이프의 파일 기술자에서 정기적으로 로그 항목을 병합하고 사용 가능한 로그 엔트리에 대해 병합 정렬을 수행하고 중앙 로그로 플러시합니다).반복합니다.)
QueueHandler Python 3.2+가 네이티브이며, 바로 이 기능을 합니다.이전 버전에서는 쉽게 복제할 수 있습니다.
Python docs에는 두 가지 완전한 예가 있습니다. 여러 프로세스에서 단일 파일로 로깅하는 것입니다.
Python .2는, < 3.2 의 카피만 하면 됩니다.QueueHandlerhttps://gist.github.com/vsajip/591589 또는 import logutils에서 자체 코드로 변환합니다.
프로세스프로세스 는을 에 합니다.Queue에 a, 그리고 a, 그리고 a.listener스레드 또는 프로세스(각 예에 대해 1개씩 제공됨)는 이러한 항목을 선택하여 모두 파일에 쓰기 때문에 손상이나 왜곡의 위험이 없습니다.
아래는 구글에서 온 다른 모든 사람을 위한 단순성에 초점을 맞춘 또 다른 솔루션입니다.로깅은 간단해야 합니다!3.2 이상일 경우에만.
import multiprocessing
import logging
from logging.handlers import QueueHandler, QueueListener
import time
import random
def f(i):
time.sleep(random.uniform(.01, .05))
logging.info('function called with {} in worker thread.'.format(i))
time.sleep(random.uniform(.01, .05))
return i
def worker_init(q):
# all records from worker processes go to qh and then into q
qh = QueueHandler(q)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.addHandler(qh)
def logger_init():
q = multiprocessing.Queue()
# this is the handler for all log records
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(levelname)s: %(asctime)s - %(process)s - %(message)s"))
# ql gets records from the queue and sends them to the handler
ql = QueueListener(q, handler)
ql.start()
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# add the handler to the logger so records from this process are handled
logger.addHandler(handler)
return ql, q
def main():
q_listener, q = logger_init()
logging.info('hello from main thread')
pool = multiprocessing.Pool(4, worker_init, [q])
for result in pool.map(f, range(10)):
pass
pool.close()
pool.join()
q_listener.stop()
if __name__ == '__main__':
main()
또 다른 대안으로는 패키지 내의 다양한 비파일 기반 로깅 핸들러를 들 수 있습니다.
SocketHandlerDatagramHandlerSyslogHandler
(기타)
이렇게 하면 안전하게 쓸 수 있고 결과를 올바르게 처리할 수 있는 곳에 로깅 데몬을 쉽게 배치할 수 있습니다(예를 들어 메시지를 개봉하여 회전하는 파일 핸들러에 보내는 단순한 소켓 서버).
SyslogHandler물론 .syslog을 사용법
2020년에는 멀티프로세싱으로 로깅하는 방법이 더 간단해 보입니다.
이 함수는 로거를 만듭니다.여기서 포맷을 설정할 수 있습니다.또, 출력을 송신할 장소(file, stdout)도 설정할 수 있습니다.
def create_logger():
import multiprocessing, logging
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(\
'[%(asctime)s| %(levelname)s| %(processName)s] %(message)s')
handler = logging.FileHandler('logs/your_file_name.log')
handler.setFormatter(formatter)
# this bit will make sure you won't have
# duplicated messages in the output
if not len(logger.handlers):
logger.addHandler(handler)
return logger
초기화 시 로거를 인스턴스화합니다.
if __name__ == '__main__':
from multiprocessing import Pool
logger = create_logger()
logger.info('Starting pooling')
p = Pool()
# rest of the code
이제 로깅이 필요한 각 기능에 이 참조만 추가하면 됩니다.
logger = create_logger()
출력 메시지:
logger.info(f'My message from {something}')
이게 도움이 됐으면 좋겠다.
로깅 스레드와 큐스레드를 분리하여 유지하는 다른 변종입니다.
"""sample code for logging in subprocesses using multiprocessing
* Little handler magic - The main process uses loggers and handlers as normal.
* Only a simple handler is needed in the subprocess that feeds the queue.
* Original logger name from subprocess is preserved when logged in main
process.
* As in the other implementations, a thread reads the queue and calls the
handlers. Except in this implementation, the thread is defined outside of a
handler, which makes the logger definitions simpler.
* Works with multiple handlers. If the logger in the main process defines
multiple handlers, they will all be fed records generated by the
subprocesses loggers.
tested with Python 2.5 and 2.6 on Linux and Windows
"""
import os
import sys
import time
import traceback
import multiprocessing, threading, logging, sys
DEFAULT_LEVEL = logging.DEBUG
formatter = logging.Formatter("%(levelname)s: %(asctime)s - %(name)s - %(process)s - %(message)s")
class SubProcessLogHandler(logging.Handler):
"""handler used by subprocesses
It simply puts items on a Queue for the main process to log.
"""
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
self.queue.put(record)
class LogQueueReader(threading.Thread):
"""thread to write subprocesses log records to main process log
This thread reads the records written by subprocesses and writes them to
the handlers defined in the main process's handlers.
"""
def __init__(self, queue):
threading.Thread.__init__(self)
self.queue = queue
self.daemon = True
def run(self):
"""read from the queue and write to the log handlers
The logging documentation says logging is thread safe, so there
shouldn't be contention between normal logging (from the main
process) and this thread.
Note that we're using the name of the original logger.
"""
# Thanks Mike for the error checking code.
while True:
try:
record = self.queue.get()
# get the logger for this record
logger = logging.getLogger(record.name)
logger.callHandlers(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
class LoggingProcess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
def _setupLogger(self):
# create the logger to use.
logger = logging.getLogger('test.subprocess')
# The only handler desired is the SubProcessLogHandler. If any others
# exist, remove them. In this case, on Unix and Linux the StreamHandler
# will be inherited.
for handler in logger.handlers:
# just a check for my sanity
assert not isinstance(handler, SubProcessLogHandler)
logger.removeHandler(handler)
# add the handler
handler = SubProcessLogHandler(self.queue)
handler.setFormatter(formatter)
logger.addHandler(handler)
# On Windows, the level will not be inherited. Also, we could just
# set the level to log everything here and filter it in the main
# process handlers. For now, just set it from the global default.
logger.setLevel(DEFAULT_LEVEL)
self.logger = logger
def run(self):
self._setupLogger()
logger = self.logger
# and here goes the logging
p = multiprocessing.current_process()
logger.info('hello from process %s with pid %s' % (p.name, p.pid))
if __name__ == '__main__':
# queue used by the subprocess loggers
queue = multiprocessing.Queue()
# Just a normal logger
logger = logging.getLogger('test')
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.setLevel(DEFAULT_LEVEL)
logger.info('hello from the main process')
# This thread will read from the subprocesses and write to the main log's
# handlers.
log_queue_reader = LogQueueReader(queue)
log_queue_reader.start()
# create the processes.
for i in range(10):
p = LoggingProcess(queue)
p.start()
# The way I read the multiprocessing warning about Queue, joining a
# process before it has finished feeding the Queue can cause a deadlock.
# Also, Queue.empty() is not realiable, so just make sure all processes
# are finished.
# active_children joins subprocesses when they're finished.
while multiprocessing.active_children():
time.sleep(.1)
현재 솔루션은 모두 핸들러를 사용하여 로깅 구성에 너무 많이 결합되어 있습니다.이 솔루션은 다음과 같은 아키텍처와 기능을 갖추고 있습니다.
- 원하는 로깅 구성을 사용할 수 있습니다.
- 로깅은 데몬 스레드로 수행됩니다.
- 컨텍스트 매니저를 사용하여 데몬을 안전하게 셧다운합니다.
- 에 대한 은 깅깅음음 음음음음 음음음음 음음음음 commun commun commun commun commun commun commun 。
multiprocessing.Queue - 「」의
logging.Logger(및 이미 정의된 인스턴스)는 모든 레코드를 큐로 전송하도록 패치됩니다. - New: 피클링 오류를 방지하기 위해 큐로 전송하기 전에 트레이스백 및 메시지를 포맷합니다.
사용 예시와 출력이 포함된 코드는 다음 Gist에서 찾을 수 있습니다.https://gist.github.com/schlamar/7003737
퍼블리셔 수 및1개의 서브스크라이버(리스너)의 멀티프로세싱 로깅을 나타낼 수 있기 때문에 제로MQ를 사용하여 PUB-SUB 메시지를 구현하는 것도 하나의 옵션입니다.
또한 ZMQ용 Python 바인딩인 PyZMQ 모듈은 zmq를 통해 로깅 메시지를 게시하기 위한 객체인 PUBHandler를 구현합니다.PUB 소켓
웹에는 PyZMQ 및 PUBHandler를 사용하여 분산된 응용 프로그램에서 중앙 집중식 로깅을 수행할 수 있는 솔루션이 있습니다. 이 솔루션은 여러 게시 프로세스를 통해 로컬에서 작업할 때 쉽게 채택할 수 있습니다.
formatters = {
logging.DEBUG: logging.Formatter("[%(name)s] %(message)s"),
logging.INFO: logging.Formatter("[%(name)s] %(message)s"),
logging.WARN: logging.Formatter("[%(name)s] %(message)s"),
logging.ERROR: logging.Formatter("[%(name)s] %(message)s"),
logging.CRITICAL: logging.Formatter("[%(name)s] %(message)s")
}
# This one will be used by publishing processes
class PUBLogger:
def __init__(self, host, port=config.PUBSUB_LOGGER_PORT):
self._logger = logging.getLogger(__name__)
self._logger.setLevel(logging.DEBUG)
self.ctx = zmq.Context()
self.pub = self.ctx.socket(zmq.PUB)
self.pub.connect('tcp://{0}:{1}'.format(socket.gethostbyname(host), port))
self._handler = PUBHandler(self.pub)
self._handler.formatters = formatters
self._logger.addHandler(self._handler)
@property
def logger(self):
return self._logger
# This one will be used by listener process
class SUBLogger:
def __init__(self, ip, output_dir="", port=config.PUBSUB_LOGGER_PORT):
self.output_dir = output_dir
self._logger = logging.getLogger()
self._logger.setLevel(logging.DEBUG)
self.ctx = zmq.Context()
self._sub = self.ctx.socket(zmq.SUB)
self._sub.bind('tcp://*:{1}'.format(ip, port))
self._sub.setsockopt(zmq.SUBSCRIBE, "")
handler = handlers.RotatingFileHandler(os.path.join(output_dir, "client_debug.log"), "w", 100 * 1024 * 1024, 10)
handler.setLevel(logging.DEBUG)
formatter = logging.Formatter("%(asctime)s;%(levelname)s - %(message)s")
handler.setFormatter(formatter)
self._logger.addHandler(handler)
@property
def sub(self):
return self._sub
@property
def logger(self):
return self._logger
# And that's the way we actually run things:
# Listener process will forever listen on SUB socket for incoming messages
def run_sub_logger(ip, event):
sub_logger = SUBLogger(ip)
while not event.is_set():
try:
topic, message = sub_logger.sub.recv_multipart(flags=zmq.NOBLOCK)
log_msg = getattr(logging, topic.lower())
log_msg(message)
except zmq.ZMQError as zmq_error:
if zmq_error.errno == zmq.EAGAIN:
pass
# Publisher processes loggers should be initialized as follows:
class Publisher:
def __init__(self, stop_event, proc_id):
self.stop_event = stop_event
self.proc_id = proc_id
self._logger = pub_logger.PUBLogger('127.0.0.1').logger
def run(self):
self._logger.info("{0} - Sending message".format(proc_id))
def run_worker(event, proc_id):
worker = Publisher(event, proc_id)
worker.run()
# Starting subscriber process so we won't loose publisher's messages
sub_logger_process = Process(target=run_sub_logger,
args=('127.0.0.1'), stop_event,))
sub_logger_process.start()
#Starting publisher processes
for i in range(MAX_WORKERS_PER_CLIENT):
processes.append(Process(target=run_worker,
args=(stop_event, i,)))
for p in processes:
p.start()
나도 zzeek의 답변을 좋아하지만, Andre의 답변은 왜곡을 방지하기 위해 대기열이 필요하다는 것이 옳다.파이프는 운이 좋았지만, 어느 정도 예상된 왜곡을 볼 수 있었습니다.구현은 생각보다 어려웠습니다. 특히 글로벌 변수와 기타 제한 사항이 있는 Windows에서 실행 중이기 때문입니다(Windows에서 Python Multiprocessing을 구현하는 방법 참조).
하지만, 나는 마침내 그것을 해냈다.이 예는 완벽하지 않을 수 있으므로 의견과 제안을 환영합니다.또한 포맷터나 루트 로거 이외의 설정도 지원하지 않습니다.기본적으로 큐를 사용하여 각 풀프로세스에서 로거를 재초기화하고 로거에 다른 Atribut을 설정해야 합니다.
다시 한 번, 코드를 개선하는 방법에 대한 어떠한 제안도 환영합니다.Python의 트릭은 아직 잘 모릅니다. :-)
import multiprocessing, logging, sys, re, os, StringIO, threading, time, Queue
class MultiProcessingLogHandler(logging.Handler):
def __init__(self, handler, queue, child=False):
logging.Handler.__init__(self)
self._handler = handler
self.queue = queue
# we only want one of the loggers to be pulling from the queue.
# If there is a way to do this without needing to be passed this
# information, that would be great!
if child == False:
self.shutdown = False
self.polltime = 1
t = threading.Thread(target=self.receive)
t.daemon = True
t.start()
def setFormatter(self, fmt):
logging.Handler.setFormatter(self, fmt)
self._handler.setFormatter(fmt)
def receive(self):
#print "receive on"
while (self.shutdown == False) or (self.queue.empty() == False):
# so we block for a short period of time so that we can
# check for the shutdown cases.
try:
record = self.queue.get(True, self.polltime)
self._handler.emit(record)
except Queue.Empty, e:
pass
def send(self, s):
# send just puts it in the queue for the server to retrieve
self.queue.put(s)
def _format_record(self, record):
ei = record.exc_info
if ei:
dummy = self.format(record) # just to get traceback text into record.exc_text
record.exc_info = None # to avoid Unpickleable error
return record
def emit(self, record):
try:
s = self._format_record(record)
self.send(s)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def close(self):
time.sleep(self.polltime+1) # give some time for messages to enter the queue.
self.shutdown = True
time.sleep(self.polltime+1) # give some time for the server to time out and see the shutdown
def __del__(self):
self.close() # hopefully this aids in orderly shutdown when things are going poorly.
def f(x):
# just a logging command...
logging.critical('function number: ' + str(x))
# to make some calls take longer than others, so the output is "jumbled" as real MP programs are.
time.sleep(x % 3)
def initPool(queue, level):
"""
This causes the logging module to be initialized with the necessary info
in pool threads to work correctly.
"""
logging.getLogger('').addHandler(MultiProcessingLogHandler(logging.StreamHandler(), queue, child=True))
logging.getLogger('').setLevel(level)
if __name__ == '__main__':
stream = StringIO.StringIO()
logQueue = multiprocessing.Queue(100)
handler= MultiProcessingLogHandler(logging.StreamHandler(stream), logQueue)
logging.getLogger('').addHandler(handler)
logging.getLogger('').setLevel(logging.DEBUG)
logging.debug('starting main')
# when bulding the pool on a Windows machine we also have to init the logger in all the instances with the queue and the level of logging.
pool = multiprocessing.Pool(processes=10, initializer=initPool, initargs=[logQueue, logging.getLogger('').getEffectiveLevel()] ) # start worker processes
pool.map(f, range(0,50))
pool.close()
logging.debug('done')
logging.shutdown()
print "stream output is:"
print stream.getvalue()
logger_tt 라이브러리를 사용할 것을 제안합니다.https://github.com/Dragon2fly/logger_tt
macOSX에서는 multiporcessing_logging 라이브러리가 동작하지 않지만 logger_tt에서는 동작합니다.
가 API를 사용하여 할 때 API를 사용하지 .import multiprocessing.
나는 zzeek의 대답이 좋았다.여러 스레드/프로세스가 동일한 파이프 엔드를 사용하여 로그 메시지를 생성하면 왜곡되므로 큐를 파이프로 대체합니다.
동시 로그 처리자는 완벽하게 작업을 수행하는 것 같습니다.Windows에서 테스트 완료.POSIX 시스템도 지원합니다.
주요 아이디어
- 로거를 반환하는 함수로 별도의 파일을 만듭니다.합니다.
ConcurrentRotatingFileHandler각 프로세스에 대해서요. 함수 " " " "get_logger()아래에 기재되어 있습니다. - 로거 작성은 프로세스 초기화 시 이루어집니다.의
multiprocessing.Process라고 하는 은, 「시작」의 「」의 의미입니다.run()★★★★★★ 。
상세 절차
이 예에서는 다음 파일 구조를 사용합니다.
.
│-- child.py <-- For a child process
│-- logs.py <-- For setting up the logs for the app
│-- main.py <-- For a main process
│-- myapp.py <-- For starting the app
│-- somemodule.py <-- For an example, a "3rd party module using standard logging"
코드
자처리
# child.py
import multiprocessing as mp
import time
from somemodule import do_something
class ChildProcess(mp.Process):
def __init__(self):
self.logger = None
super().__init__()
def run(self):
from logs import get_logger
self.logger = get_logger()
while True:
time.sleep(1)
self.logger.info("Child process")
do_something()
- 「」를 계승하는 한 자 .
multiprocessing.Processprocess에 됩니다. - 중요:그
get_logger()불려요.run()또는 자 프로세스 내의 다른 부분(모듈레벨이 아님)__init__()은 로 합니다.)get_logger()작성하다ConcurrentRotatingFileHandler각 프로세스에 대해 새로운 인스턴스가 필요합니다. do_something.recurrent-logrecommunity, concurrent-log-recommunity, concurrent-log-recommunity를 사용하고 하고 있음을 만 사용됩니다.서드 파티제 라이브러리 코드는, 동시 로그 취득을 사용하고 있는 것을 알 수 없습니다.
메인 프로세스
# main.py
import logging
import multiprocessing as mp
import time
from child import ChildProcess
from somemodule import do_something
class MainProcess(mp.Process):
def __init__(self):
self.logger = logging.getLogger()
super().__init__()
def run(self):
from logs import get_logger
self.logger = get_logger()
self.child = ChildProcess()
self.child.daemon = True
self.child.start()
while True:
time.sleep(0.5)
self.logger.critical("Main process")
do_something()
- 1초에 2회 파일에 로그인하는 메인 프로세스., 「」로부터 됩니다.
multiprocessing.Process. - 같은 코멘트
get_logger()★★★★★★★★★★★★★★★★★」do_something()이치노
로거 셋업
# logs.py
import logging
import os
from concurrent_log_handler import ConcurrentRotatingFileHandler
LOGLEVEL = logging.DEBUG
def get_logger():
logger = logging.getLogger()
if logger.handlers:
return logger
# Use an absolute path to prevent file rotation trouble.
logfile = os.path.abspath("mylog.log")
logger.setLevel(LOGLEVEL)
# Rotate log after reaching 512K, keep 5 old copies.
filehandler = ConcurrentRotatingFileHandler(
logfile, mode="a", maxBytes=512 * 1024, backupCount=5, encoding="utf-8"
)
filehandler.setLevel(LOGLEVEL)
# create also handler for displaying output in the stdout
ch = logging.StreamHandler()
ch.setLevel(LOGLEVEL)
formatter = logging.Formatter(
"%(asctime)s - %(module)s - %(levelname)s - %(message)s [Process: %(process)d, %(filename)s:%(funcName)s(%(lineno)d)]"
)
# add formatter to ch
ch.setFormatter(formatter)
filehandler.setFormatter(formatter)
logger.addHandler(ch)
logger.addHandler(filehandler)
return logger
- 은 「」를 합니다.
ConcurrentRotatingFileHandler패키지concurrent-log-concertificate에서 합니다.Concurrent Rotating File Handler 입니다. - 는 " " 입니다.
ConcurrentRotatingFileHandler모든 프로세스에서 동일해야 합니다.
앱 예시
# myapp.py
if __name__ == "__main__":
from main import MainProcess
p = MainProcess()
p.start()
- 다중 처리 응용 프로그램을 시작하는 방법에 대한 간단한 예제입니다.
을 표준으로 한 예logging
# somemodule.py
import logging
logger = logging.getLogger("somemodule")
def do_something():
logging.info("doing something")
- 서드파티 코드의 로거가 정상적으로 동작하는지를 테스트하는 간단한 예를 나타냅니다.
출력 예
2021-04-19 19:02:29,425 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,427 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:29,929 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:29,931 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,133 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:30,137 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:30,436 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,439 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:30,944 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:30,946 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
2021-04-19 19:02:31,142 - child - INFO - Child process [Process: 76700, child.py:run(18)]
2021-04-19 19:02:31,145 - somemodule - INFO - doing something [Process: 76700, somemodule.py:do_something(7)]
2021-04-19 19:02:31,449 - main - CRITICAL - Main process [Process: 103348, main.py:run(23)]
2021-04-19 19:02:31,451 - somemodule - INFO - doing something [Process: 103348, somemodule.py:do_something(7)]
큐에서 모든 로그 항목을 읽는 다른 프로세스에 모든 로깅을 위임하는 것은 어떻습니까?
LOG_QUEUE = multiprocessing.JoinableQueue()
class CentralLogger(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
self.log = logger.getLogger('some_config')
self.log.info("Started Central Logging process")
def run(self):
while True:
log_level, message = self.queue.get()
if log_level is None:
self.log.info("Shutting down Central Logging process")
break
else:
self.log.log(log_level, message)
central_logger_process = CentralLogger(LOG_QUEUE)
central_logger_process.start()
멀티프로세스 메커니즘이나 상속을 통해 LOG_QUUE를 공유하기만 하면 모든 것이 잘 해결됩니다.
다음은 ActivePython이 필요한 Windows 환경에서 사용할 수 있는 클래스입니다.다른 로깅 핸들러(Stream Handler 등)에 대해서도 상속할 수 있습니다.
class SyncronizedFileHandler(logging.FileHandler):
MUTEX_NAME = 'logging_mutex'
def __init__(self , *args , **kwargs):
self.mutex = win32event.CreateMutex(None , False , self.MUTEX_NAME)
return super(SyncronizedFileHandler , self ).__init__(*args , **kwargs)
def emit(self, *args , **kwargs):
try:
win32event.WaitForSingleObject(self.mutex , win32event.INFINITE)
ret = super(SyncronizedFileHandler , self ).emit(*args , **kwargs)
finally:
win32event.ReleaseMutex(self.mutex)
return ret
다음으로 사용 예를 제시하겠습니다.
import logging
import random , time , os , sys , datetime
from string import letters
import win32api , win32event
from multiprocessing import Pool
def f(i):
time.sleep(random.randint(0,10) * 0.1)
ch = random.choice(letters)
logging.info( ch * 30)
def init_logging():
'''
initilize the loggers
'''
formatter = logging.Formatter("%(levelname)s - %(process)d - %(asctime)s - %(filename)s - %(lineno)d - %(message)s")
logger = logging.getLogger()
logger.setLevel(logging.INFO)
file_handler = SyncronizedFileHandler(sys.argv[1])
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
logger.addHandler(file_handler)
#must be called in the parent and in every worker process
init_logging()
if __name__ == '__main__':
#multiprocessing stuff
pool = Pool(processes=10)
imap_result = pool.imap(f , range(30))
for i , _ in enumerate(imap_result):
pass
Ironhacker와 유사한 솔루션이 있습니다.단, 일부 코드에서 logging.exception을 사용하고 있습니다.트레이스백은 피클되지 않기 때문에 큐에 반환하기 전에 예외를 포맷해야 합니다.
class QueueHandler(logging.Handler):
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def emit(self, record):
if record.exc_info:
# can't pass exc_info across processes so just format now
record.exc_text = self.formatException(record.exc_info)
record.exc_info = None
self.queue.put(record)
def formatException(self, ei):
sio = cStringIO.StringIO()
traceback.print_exception(ei[0], ei[1], ei[2], None, sio)
s = sio.getvalue()
sio.close()
if s[-1] == "\n":
s = s[:-1]
return s
에서 잠금,및, " " " " " " " " "logging모듈. 버그리포트 6721에 기재되어 있습니다(관련 SO 질문도 참조).
여기에 작은 수정 솔루션이 게시되어 있습니다.
이 을 사용하면, 「」, 「」, 「」의됩니다.logging그렇다고 해서 일이 엉망이 될 수도 있다는 것을 고칠 수는 없다.여기에 제시된 다른 답변을 참조하십시오.
여기 간단한 해킹/회피책이 있습니다.가장 포괄적이지는 않지만 쉽게 수정이 가능하고 읽기 쉽고 이해하기 쉽다고 생각합니다.
import logging
import multiprocessing
class FakeLogger(object):
def __init__(self, q):
self.q = q
def info(self, item):
self.q.put('INFO - {}'.format(item))
def debug(self, item):
self.q.put('DEBUG - {}'.format(item))
def critical(self, item):
self.q.put('CRITICAL - {}'.format(item))
def warning(self, item):
self.q.put('WARNING - {}'.format(item))
def some_other_func_that_gets_logger_and_logs(num):
# notice the name get's discarded
# of course you can easily add this to your FakeLogger class
local_logger = logging.getLogger('local')
local_logger.info('Hey I am logging this: {} and working on it to make this {}!'.format(num, num*2))
local_logger.debug('hmm, something may need debugging here')
return num*2
def func_to_parallelize(data_chunk):
# unpack our args
the_num, logger_q = data_chunk
# since we're now in a new process, let's monkeypatch the logging module
logging.getLogger = lambda name=None: FakeLogger(logger_q)
# now do the actual work that happens to log stuff too
new_num = some_other_func_that_gets_logger_and_logs(the_num)
return (the_num, new_num)
if __name__ == '__main__':
multiprocessing.freeze_support()
m = multiprocessing.Manager()
logger_q = m.Queue()
# we have to pass our data to be parallel-processed
# we also need to pass the Queue object so we can retrieve the logs
parallelable_data = [(1, logger_q), (2, logger_q)]
# set up a pool of processes so we can take advantage of multiple CPU cores
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=4)
worker_output = pool.map(func_to_parallelize, parallelable_data)
pool.close() # no more tasks
pool.join() # wrap up current tasks
# get the contents of our FakeLogger object
while not logger_q.empty():
print logger_q.get()
print 'worker output contained: {}'.format(worker_output)
이 멋진 패키지가 있습니다.
패키지: https://pypi.python.org/pypi/multiprocessing-logging/
코드: https://github.com/jruere/multiprocessing-logging
인스톨:
pip install multiprocessing-logging
그 후 다음을 추가합니다.
import multiprocessing_logging
# This enables logs inside process
multiprocessing_logging.install_mp_handler()
필요한 사용자를 위해 현재 프로세스 이름을 로그에 추가하는 multiprocessing_logging 패키지의 데코레이터를 작성했습니다.그러면 누가 무엇을 기록하는지 알 수 있습니다.
install_mp_handler()도 실행되므로 풀을 만들기 전에 실행해도 소용이 없습니다.
이를 통해 어떤 워커가 메시지를 기록하는지 확인할 수 있습니다.
다음은 설계도와 예시입니다.
import sys
import logging
from functools import wraps
import multiprocessing
import multiprocessing_logging
# Setup basic console logger as 'logger'
logger = logging.getLogger()
console_handler = logging.StreamHandler(sys.stdout)
console_handler.setFormatter(logging.Formatter(u'%(asctime)s :: %(levelname)s :: %(message)s'))
logger.setLevel(logging.DEBUG)
logger.addHandler(console_handler)
# Create a decorator for functions that are called via multiprocessing pools
def logs_mp_process_names(fn):
class MultiProcessLogFilter(logging.Filter):
def filter(self, record):
try:
process_name = multiprocessing.current_process().name
except BaseException:
process_name = __name__
record.msg = f'{process_name} :: {record.msg}'
return True
multiprocessing_logging.install_mp_handler()
f = MultiProcessLogFilter()
# Wraps is needed here so apply / apply_async know the function name
@wraps(fn)
def wrapper(*args, **kwargs):
logger.removeFilter(f)
logger.addFilter(f)
return fn(*args, **kwargs)
return wrapper
# Create a test function and decorate it
@logs_mp_process_names
def test(argument):
logger.info(f'test function called via: {argument}')
# You can also redefine undecored functions
def undecorated_function():
logger.info('I am not decorated')
@logs_mp_process_names
def redecorated(*args, **kwargs):
return undecorated_function(*args, **kwargs)
# Enjoy
if __name__ == '__main__':
with multiprocessing.Pool() as mp_pool:
# Also works with apply_async
mp_pool.apply(test, ('mp pool',))
mp_pool.apply(redecorated)
logger.info('some main logs')
test('main program')
중 는 이미 "Mutliprocessing logging"을 입니다.atexit이러한 프로세스에 참가하기 위한 핸들러는 그것을 stderr에서 다시 읽습니다.단, 이 방법으로는 stderr의 출력 메시지로의 실시간플로우를 얻을 수 없습니다.
전술한 바와 같이 가장 심플한 아이디어:
- 현재 프로세스의 파일 이름과 프로세스 ID를 가져옵니다.
[WatchedFileHandler][1]이 핸들러의 이유는 여기서 자세히 설명하지만 간단히 말하면 다른 로깅핸들러와의 레이스 상태가 더 나빠질 수 있습니다.이게 레이스 조건의 가장 짧은 창입니다.- 로그를 저장할 경로를 선택하십시오(예: /var/log/...).
언급URL : https://stackoverflow.com/questions/641420/how-should-i-log-while-using-multiprocessing-in-python
'programing' 카테고리의 다른 글
| Galera에서 마스터가 다운된 경우 마스터/슬레이브 리플리케이션 로드밸런싱 (0) | 2022.12.26 |
|---|---|
| Big Decimal을 정수로 변환 (0) | 2022.12.26 |
| 오류: 클라이언트에 보낸 후 헤더를 설정할 수 없습니다. (0) | 2022.12.26 |
| KEY 키워드의 의미는 무엇입니까? (0) | 2022.12.26 |
| PHP 파일이 일반 텍스트로 표시되는 이유는 무엇입니까? (0) | 2022.12.07 |