Python中logging在多进程环境下打印日志

时间:2021-06-19 10:08:31

因为涉及到进程间互斥与通信问题,因此默认情况下Python中的logging无法在多进程环境下打印日志。但是查询了官方文档可以发现,推荐了一种利用logging.SocketHandler的方案来实现多进程日志打印。

其原理很简单,概括一句话就是说:多个进程将各自环境下的日志通过Socket发送给一个专门打印日志的进程,这样就可以防止多进程打印的冲突与混乱情况。

本文主要记录下SocketHandler真实的用法情况:

1 时序图

简单说明下逻辑:主进程(MainProcess)启动一个专门打印日志的进程(LogReceiverProcess),并且将自己(主进程)环境下的日志都“重定向”给LogReceiverProcess。同理,在后续逻辑中启动的所有工作子进程(WorkerProcess)都做一样的操作,把自己环境下的日志都“重定向”给日志进程去打印。

Python中logging在多进程环境下打印日志

2 实现代码

2.1 日志进程

  日志进程的代码核心在于要建立一个TCP Server来接收并处理Log record,代码如下:

 import os
import logging
import logging.handlers
import traceback
import cPickle
import struct
import SocketServer
from multiprocessing import Process class LogRecordStreamHandler(SocketServer.StreamRequestHandler):
def handle(self):
while True:
try:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L", chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unpickle(chunk)
record = logging.makeLogRecord(obj)
self.handle_log_record(record) except:
break @classmethod
def unpickle(cls, data):
return cPickle.loads(data) def handle_log_record(self, record):
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
logger = logging.getLogger(name)
logger.handle(record) class LogRecordSocketReceiver(SocketServer.ThreadingTCPServer):
allow_reuse_address = 1 def __init__(self, host='localhost', port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, handler=LogRecordStreamHandler):
SocketServer.ThreadingTCPServer.__init__(self, (host, port), handler)
self.abort = 0
self.timeout = 1
self.logname = None def serve_until_stopped(self):
import select
abort = 0
while not abort:
rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout)
if rd:
self.handle_request()
abort = self.abort def _log_listener_process(log_format, log_time_format, log_file):
log_file = os.path.realpath(log_file)
logging.basicConfig(level=logging.DEBUG, format=log_format, datefmt=log_time_format, filename=log_file, filemode='a+') # Console log
console = logging.StreamHandler()
console.setLevel(logging.INFO)
console.setFormatter(logging.Formatter(fmt=log_format, datefmt=log_time_format))
logging.getLogger().addHandler(console) tcp_server = LogRecordSocketReceiver() logging.debug('Log listener process started ...')
tcp_server.serve_until_stopped()

  关键点:

(1)TCPServer的构建逻辑,拆包还原Log记录;

(2)在日志进程中设定好logging记录级别和打印方式,这里除了指定文件存储还添加了Console打印。

2.2 其他进程

  除了日志进程之外的进程,设置logging都“重定向”给日志进程,并且要关闭当前进程的日志在Console打印(默认会显示Warning级别及以上的日志到Console),否则Console上日志展示会有重复凌乱的感觉。

 class LogHelper:
# 默认日志存储路径(相对于当前文件路径)
default_log_path = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', 'logs') # 记录当前实际的日志所在目录
current_log_path = '' # 记录当前实际的日志完整路径
current_log_file = '' # 日志文件内容格式
log_format = '[%(asctime)s.%(msecs)03d][%(processName)s][%(levelname)s][%(filename)s:%(lineno)d] %(message)s' # 日志中时间格式
log_time_format = '%Y%m%d %H:%M:%S' # 日志进程
log_process = None def __init__(self):
pass @staticmethod
def print_console_log(level, message):
print '--------------------------------------------------'
if level == logging.WARN:
level_str = '[WARN]'
elif level == logging.ERROR:
level_str = '[ERROR]'
elif level == logging.FATAL:
level_str = '[FATAL]'
else:
level_str = '[INFO]'
print '\t%s %s' % (level_str, message)
print '--------------------------------------------------' @staticmethod
def init(clear_logs=True, log_path=''):
#
console = logging.StreamHandler()
console.setLevel(logging.FATAL)
logging.getLogger().addHandler(console) try:
# 如果外部没有指定日志存储路径则默认在common同级路径存储
if log_path == '':
log_path = LogHelper.default_log_path
if not os.path.exists(log_path):
os.makedirs(log_path)
LogHelper.current_log_path = log_path # 清理旧的日志并初始化当前日志路径
if clear_logs:
LogHelper.clear_old_log_files()
LogHelper.current_log_file = LogHelper._get_latest_log_file() socket_handler = logging.handlers.SocketHandler('localhost', logging.handlers.DEFAULT_TCP_LOGGING_PORT)
logging.getLogger().setLevel(logging.DEBUG)
logging.getLogger().addHandler(socket_handler) #
LogHelper.start() except Exception, ex:
LogHelper.print_console_log(logging.FATAL, 'init() exception: %s' % str(ex))
traceback.print_exc() @staticmethod
def start():
if LogHelper.log_process is None:
LogHelper.log_process = Process(target=_log_listener_process, name='LogRecorder', args=(LogHelper.log_format, LogHelper.log_time_format, LogHelper.current_log_file))
LogHelper.log_process.start()
else:
pass @staticmethod
def stop():
if LogHelper.log_process is None:
pass
else:
LogHelper.log_process.terminate()
LogHelper.log_process.join() @staticmethod
def _get_latest_log_file():
latest_log_file = ''
try:
if os.path.exists(LogHelper.current_log_path):
for maindir, subdir, file_name_list in os.walk(LogHelper.current_log_path):
for file_name in file_name_list:
apath = os.path.join(maindir, file_name)
if apath > latest_log_file:
latest_log_file = apath if latest_log_file == '':
latest_log_file = LogHelper.current_log_path + os.sep + 'system_'
latest_log_file += time.strftime("%Y%m%d_%H%M%S", time.localtime(time.time())) + '.log' except Exception, ex:
logging.error('EXCEPTION: %s' % str(ex))
traceback.print_exc() finally:
return latest_log_file @staticmethod
def get_log_file():
return LogHelper.current_log_file @staticmethod
def clear_old_log_files():
if not os.path.exists(LogHelper.current_log_path):
logging.warning('clear_old_log_files() Not exist: %s' % LogHelper.current_log_path)
return try:
for maindir, subdir, file_name_list in os.walk(LogHelper.current_log_path):
for file_name in file_name_list:
apath = os.path.join(maindir, file_name)
if apath != LogHelper.current_log_file:
logging.info('DEL -> %s' % str(apath))
os.remove(apath)
else:
with open(LogHelper.current_log_file, 'w') as f:
f.write('') logging.debug('Clear log done.') except Exception, ex:
logging.error('EXCEPTION: %s' % str(ex))
traceback.print_exc()