123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/3/4 22:28
- # file: myLog.py
- # author: David
- # company: shenyang JY
- """
- 1. 信息流以控制台和文件形式打印
- 2. 文件保存以启动日期为文件名
- 3. 控制台INFO,文件DEBUG
- """
- import codecs
- from pathlib import Path
- import logging, logging.handlers, time, os, re
- from logging.handlers import BaseRotatingHandler
- class DailyRotatingFileHandler(BaseRotatingHandler):
- """
- 同`logging.TimedRotatingFileHandler`类似,不过这个handler:
- - 可以支持多进程
- - 只支持自然日分割
- - 暂不支持UTC
- """
- def __init__(self, filename, backupCount=0, encoding=None, delay=False, utc=False, **kwargs):
- self.backup_count = backupCount
- self.utc = utc
- self.suffix = "%Y-%m-%d"
- self.base_log_path = Path(filename)
- self.base_filename = self.base_log_path.name
- self.current_filename = self._compute_fn()
- self.current_log_path = self._compute_lp()
- BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
- def shouldRollover(self, record):
- """
- 判断是否该滚动日志,如果当前时间对应的日志文件名与当前打开的日志文件名不一致,则需要滚动日志
- """
- if self.current_filename != self._compute_fn():
- # 日期变了,计算新的日志文件
- self.current_filename = self._compute_fn()
- return True
- elif os.path.getsize(self.current_log_path) > 10485760: # 判断文件是否大于10MB字节数
- # 超过10M了,计算新的日志文件
- seg = int(self.current_filename.split(".")[-2]) + 1
- self.current_filename = self._compute_fn(seg=seg)
- return True
- return False
- def doRollover(self):
- """
- 滚动日志
- """
- # 关闭旧的日志文件
- if self.stream:
- self.stream.close()
- self.stream = None
- # self.current_log_path = self.base_log_path.with_name(self.current_filename)
- self.current_log_path = self._compute_lp()
- # 打开新的日志文件
- if not self.delay:
- self.stream = self._open()
- # 删除过期日志
- # self.delete_expired_files()
- def _compute_lp(self):
- """
- 计算当前时间对应日志的路径
- """
- current_log_path = self.base_log_path.parent / time.strftime(self.suffix, time.localtime())
- if not os.path.exists(current_log_path):
- os.mkdir(current_log_path)
- return current_log_path / self.current_filename
- def _compute_fn(self, seg=0):
- """
- 计算当前时间对应的日志文件名
- """
- return "limit-power" + "." + time.strftime(self.suffix, time.localtime()) + '.' + str(seg) +'.log'
- def _open(self):
- """
- 打开新的日志文件,同时更新base_filename指向的软链,修改软链不会对日志记录产生任何影响
- """
- if self.encoding is None:
- stream = open(str(self.current_log_path), self.mode)
- else:
- stream = codecs.open(str(self.current_log_path), self.mode, self.encoding)
- # # 删除旧的软链
- # if self.base_log_path.exists():
- # try:
- # # 如果base_log_path不是软链或者指向的日志文件不对,则先删除该软链
- # if not self.base_log_path.is_symlink() or os.readlink(self.base_log_path) != self.current_log_path:
- # os.remove(self.base_log_path)
- # except OSError:
- # pass
- #
- # # 建立新的软链
- # try:
- # os.symlink(self.current_log_path, str(self.base_log_path))
- # except OSError:
- # pass
- return stream
- def delete_expired_files(self):
- """
- 删除过期的日志
- """
- if self.backup_count <= 0:
- return
- file_names = os.listdir(str(self.base_log_path.parent))
- result = []
- prefix = self.base_filename + "."
- plen = len(prefix)
- for file_name in file_names:
- if re.match(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", file_name):
- result.append(file_name)
- if len(result) < self.backup_count:
- result = []
- else:
- result.sort()
- result = result[:len(result) - self.backup_count]
- import shutil
- for file_name in result:
- path = self.base_log_path.with_name(file_name)
- if os.path.isdir(path):
- shutil.rmtree(path)
- class Log(object):
- def __init__(self):
- # 定义对应的程序模块名name,默认为root
- self.logger = logging.getLogger()
- # 设置输出的等级
- LEVELS = {'NOSET': logging.NOTSET,
- 'DEBUG': logging.DEBUG,
- 'INFO': logging.INFO,
- 'WARNING': logging.WARNING,
- 'ERROR': logging.ERROR,
- 'CRITICAL': logging.CRITICAL}
- # 必须设置,这里如果不显示设置,默认过滤掉warning之前的所有级别的信息
- self.logger.setLevel(LEVELS['DEBUG'])
- # 仅为matplotlib设置更高的日志等级(ERROR)
- matplotlib_logger = logging.getLogger('matplotlib')
- matplotlib_logger.setLevel(logging.ERROR)
- # 日志输出格式
- self.formatter = logging.Formatter(
- '%(asctime)s - %(filename)s - %(levelname)s - %(message)s - %(funcName)s') # 输出日志格式
- # 创建一个handler, 向文件logname输出日志信息
- # fh = logging.FileHandler(self.logname, 'a', encoding='utf-8')
- # midnight:表示日志文件再每天半夜时分滚动
- # interval: 间隔单位的个数,指等待多少个when的时间后 Logger会自动重建新闻继续进行日志记录
- # backupCount:表示日志文件的保留个数,假如为30,保留最近30天的日志文件
- # fh = logging.handlers.TimedRotatingFileHandler(self.getLogName(), when='midnight', interval=1, backupCount=30, encoding='utf-8')
- # fh.suffix = "%Y-%m-%d"
- # # fh.extMatch = r"^\d{4}-\d{2}-\d{2}"
- # # 设置日志等级
- # fh.setLevel(LEVELS['INFO'])
- # # 设置handler的格式对象
- # fh.setFormatter(self.formatter)
- filename = self.getLogName()
- dr_fh = DailyRotatingFileHandler(filename, backupCount=100, encoding='utf-8')
- dr_fh.setFormatter(self.formatter)
- # 将handler增加到logger中
- self.logger.addHandler(dr_fh)
- # 创建一个StreamHandler,用于输出到控制台
- ch = logging.StreamHandler()
- ch.setLevel(LEVELS['INFO'])
- ch.setFormatter(self.formatter)
- self.logger.addHandler(ch)
- # # 关闭打开的文件
- dr_fh.close()
- def getLogName(self):
- # log_path是存放日志的路径
- # lib_path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'logs'))
- lib_path = Path(os.path.dirname(__file__)).parent / 'logs'
- self.logger.info("日志输出路径为:{}".format(lib_path))
- # 如果不存在这个logs文件夹,就自动创建一个
- if not os.path.exists(lib_path):
- os.mkdir(lib_path)
- return lib_path / 'limit_power_link.log'
- if __name__ == "__main__":
- logger = Log()
- logger.info("this is info")
- logger.debug("this is debug")
- logger.error("this is error")
- logger.warning("this is warning")
- logger.critical("critical")
|