#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/3/4 22:28 # file: myLog.py # author: David # company: shenyang JY """ 1. 信息流以控制台和文件形式打印 2. 文件保存以启动日期为文件名 3. 控制台INFO,文件DEBUG """ import codecs from pathlib import Path import logging, logging.handlers, time, os, re from logging.handlers import BaseRotatingHandler class DailyRotatingFileHandler(BaseRotatingHandler): """ 同`logging.TimedRotatingFileHandler`类似,不过这个handler: - 可以支持多进程 - 只支持自然日分割 - 暂不支持UTC """ def __init__(self, filename, backupCount=0, encoding=None, delay=False, utc=False, **kwargs): self.backup_count = backupCount self.utc = utc self.suffix = "%Y-%m-%d" self.base_log_path = Path(filename) self.base_filename = self.base_log_path.name self.current_filename = self._compute_fn() self.current_log_path = self._compute_lp() BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay) def shouldRollover(self, record): """ 判断是否该滚动日志,如果当前时间对应的日志文件名与当前打开的日志文件名不一致,则需要滚动日志 """ if self.current_filename != self._compute_fn(): # 日期变了,计算新的日志文件 self.current_filename = self._compute_fn() return True elif os.path.getsize(self.current_log_path) > 10485760: # 判断文件是否大于10MB字节数 # 超过10M了,计算新的日志文件 seg = int(self.current_filename.split(".")[-2]) + 1 self.current_filename = self._compute_fn(seg=seg) return True return False def doRollover(self): """ 滚动日志 """ # 关闭旧的日志文件 if self.stream: self.stream.close() self.stream = None # self.current_log_path = self.base_log_path.with_name(self.current_filename) self.current_log_path = self._compute_lp() # 打开新的日志文件 if not self.delay: self.stream = self._open() # 删除过期日志 # self.delete_expired_files() def _compute_lp(self): """ 计算当前时间对应日志的路径 """ current_log_path = self.base_log_path.parent / time.strftime(self.suffix, time.localtime()) if not os.path.exists(current_log_path): os.mkdir(current_log_path) return current_log_path / self.current_filename def _compute_fn(self, seg=0): """ 计算当前时间对应的日志文件名 """ return "ipfcst-forecast" + "." + time.strftime(self.suffix, time.localtime()) + '.' + str(seg) +'.log' def _open(self): """ 打开新的日志文件,同时更新base_filename指向的软链,修改软链不会对日志记录产生任何影响 """ if self.encoding is None: stream = open(str(self.current_log_path), self.mode) else: stream = codecs.open(str(self.current_log_path), self.mode, self.encoding) # # 删除旧的软链 # if self.base_log_path.exists(): # try: # # 如果base_log_path不是软链或者指向的日志文件不对,则先删除该软链 # if not self.base_log_path.is_symlink() or os.readlink(self.base_log_path) != self.current_log_path: # os.remove(self.base_log_path) # except OSError: # pass # # # 建立新的软链 # try: # os.symlink(self.current_log_path, str(self.base_log_path)) # except OSError: # pass return stream def delete_expired_files(self): """ 删除过期的日志 """ if self.backup_count <= 0: return file_names = os.listdir(str(self.base_log_path.parent)) result = [] prefix = self.base_filename + "." plen = len(prefix) for file_name in file_names: if re.match(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", file_name): result.append(file_name) if len(result) < self.backup_count: result = [] else: result.sort() result = result[:len(result) - self.backup_count] import shutil for file_name in result: path = self.base_log_path.with_name(file_name) if os.path.isdir(path): shutil.rmtree(path) class Log(object): def __init__(self): # 定义对应的程序模块名name,默认为root self.logger = logging.getLogger() # 设置输出的等级 LEVELS = {'NOSET': logging.NOTSET, 'DEBUG': logging.DEBUG, 'INFO': logging.INFO, 'WARNING': logging.WARNING, 'ERROR': logging.ERROR, 'CRITICAL': logging.CRITICAL} # 必须设置,这里如果不显示设置,默认过滤掉warning之前的所有级别的信息 self.logger.setLevel(LEVELS['DEBUG']) # 仅为matplotlib设置更高的日志等级(ERROR) matplotlib_logger = logging.getLogger('matplotlib') matplotlib_logger.setLevel(logging.ERROR) # 日志输出格式 self.formatter = logging.Formatter( '%(asctime)s - %(filename)s - %(levelname)s - %(message)s - %(funcName)s') # 输出日志格式 # 创建一个handler, 向文件logname输出日志信息 # fh = logging.FileHandler(self.logname, 'a', encoding='utf-8') # midnight:表示日志文件再每天半夜时分滚动 # interval: 间隔单位的个数,指等待多少个when的时间后 Logger会自动重建新闻继续进行日志记录 # backupCount:表示日志文件的保留个数,假如为30,保留最近30天的日志文件 # fh = logging.handlers.TimedRotatingFileHandler(self.getLogName(), when='midnight', interval=1, backupCount=30, encoding='utf-8') # fh.suffix = "%Y-%m-%d" # # fh.extMatch = r"^\d{4}-\d{2}-\d{2}" # # 设置日志等级 # fh.setLevel(LEVELS['INFO']) # # 设置handler的格式对象 # fh.setFormatter(self.formatter) filename = self.getLogName() dr_fh = DailyRotatingFileHandler(filename, backupCount=100, encoding='utf-8') dr_fh.setFormatter(self.formatter) # 将handler增加到logger中 self.logger.addHandler(dr_fh) # 创建一个StreamHandler,用于输出到控制台 ch = logging.StreamHandler() ch.setLevel(LEVELS['INFO']) ch.setFormatter(self.formatter) self.logger.addHandler(ch) # # 关闭打开的文件 dr_fh.close() def getLogName(self): # log_path是存放日志的路径 # lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'logs')) lib_path = Path(os.path.dirname(__file__)).parent.parent.parent / 'logs' self.logger.info("日志输出路径为:{}".format(lib_path)) # 如果不存在这个logs文件夹,就自动创建一个 if not os.path.exists(lib_path): os.mkdir(lib_path) return lib_path / 'ipfcst_forecast_link.log' if __name__ == "__main__": logger = Log() logger.info("this is info") logger.debug("this is debug") logger.error("this is error") logger.warning("this is warning") logger.critical("critical")