David пре 4 месеци
родитељ
комит
971b63df58
2 измењених фајлова са 243 додато и 30 уклоњено
  1. 204 0
      common/logs.py
  2. 39 30
      data_processing/data_operation/data_nwp_ftp.py

+ 204 - 0
common/logs.py

@@ -0,0 +1,204 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/3/4 22:28
+# file: myLog.py
+# author: David
+# company: shenyang JY
+
+"""
+1. 信息流以控制台和文件形式打印
+2. 文件保存以启动日期为文件名
+3. 控制台INFO,文件DEBUG
+"""
+import codecs
+from pathlib import Path
+import logging, logging.handlers, time, os, re
+from logging.handlers import BaseRotatingHandler
+
+
+class DailyRotatingFileHandler(BaseRotatingHandler):
+    """
+    同`logging.TimedRotatingFileHandler`类似,不过这个handler:
+    - 可以支持多进程
+    - 只支持自然日分割
+    - 暂不支持UTC
+    """
+
+    def __init__(self, filename, logs_name, backupCount=0, encoding=None, delay=False, utc=False, **kwargs):
+        self.backup_count = backupCount
+        self.utc = utc
+        self.suffix = "%Y-%m-%d"
+        self.base_log_path = Path(filename)
+        self.base_filename = self.base_log_path.name
+        self.current_filename = self._compute_fn()
+        self.current_log_path = self._compute_lp()
+        self.logs_name = logs_name
+        BaseRotatingHandler.__init__(self, filename, 'a', encoding, delay)
+
+    def shouldRollover(self, record):
+        """
+        判断是否该滚动日志,如果当前时间对应的日志文件名与当前打开的日志文件名不一致,则需要滚动日志
+        """
+        if self.current_filename != self._compute_fn():
+            # 日期变了,计算新的日志文件
+            self.current_filename = self._compute_fn()
+            return True
+        elif os.path.getsize(self.current_log_path) > 10485760:     # 判断文件是否大于10MB字节数
+            # 超过10M了,计算新的日志文件
+            seg = int(self.current_filename.split(".")[-2]) + 1
+            self.current_filename = self._compute_fn(seg=seg)
+            return True
+        return False
+
+    def doRollover(self):
+        """
+        滚动日志
+        """
+        # 关闭旧的日志文件
+        if self.stream:
+            self.stream.close()
+            self.stream = None
+
+        self.current_log_path = self._compute_lp()
+        # 打开新的日志文件
+        if not self.delay:
+            self.stream = self._open()
+
+        # 删除过期日志
+        self.delete_expired_files()
+
+    def _compute_lp(self):
+        """
+        计算当前时间对应日志的路径
+        """
+        current_log_path = self.base_log_path.parent / time.strftime(self.suffix, time.localtime())
+        if not os.path.exists(current_log_path):
+            os.mkdir(current_log_path)
+        return current_log_path / self.current_filename
+
+    def _compute_fn(self, seg=0):
+        """
+        计算当前时间对应的日志文件名
+        """
+
+        return f"ap-{self.logs_name}" + "." + time.strftime(self.suffix, time.localtime()) + '.' + str(seg) +'.log'
+
+    def _open(self):
+        """
+        打开新的日志文件,同时更新base_filename指向的软链,修改软链不会对日志记录产生任何影响
+        """
+        if self.encoding is None:
+            stream = open(str(self.current_log_path), self.mode)
+        else:
+            stream = codecs.open(str(self.current_log_path), self.mode, self.encoding)
+
+        # 删除旧的软链
+        if self.base_log_path.exists():
+            try:
+                # 如果base_log_path不是软链或者指向的日志文件不对,则先删除该软链
+                if not self.base_log_path.is_symlink() or os.readlink(self.base_log_path) != self.current_log_path:
+                    os.remove(self.base_log_path)
+            except OSError:
+                pass
+
+        # 建立新的软链
+        try:
+            os.symlink(self.current_log_path, str(self.base_log_path))
+        except OSError:
+            pass
+        return stream
+
+    def delete_expired_files(self):
+        """
+        删除过期的日志
+        """
+        if self.backup_count <= 0:
+            return
+
+        file_names = os.listdir(str(self.base_log_path.parent))
+        result = []
+        prefix = self.base_filename + "."
+        plen = len(prefix)
+        for file_name in file_names:
+            if re.match(r"^\d{4}-\d{2}-\d{2}(\.\w+)?$", file_name):
+                result.append(file_name)
+        if len(result) < self.backup_count:
+            result = []
+        else:
+            result.sort()
+            result = result[:len(result) - self.backup_count]
+        import shutil
+        for file_name in result:
+            path = self.base_log_path.with_name(file_name)
+            if path.is_dir():
+                shutil.rmtree(path)
+
+
+class Log(object):
+    def __init__(self, logs_name):
+        # 定义对应的程序模块名name,默认为root
+        self.logger = logging.getLogger()
+        # 设置输出的等级
+        LEVELS = {'NOSET': logging.NOTSET,
+                  'DEBUG': logging.DEBUG,
+                  'INFO': logging.INFO,
+                  'WARNING': logging.WARNING,
+                  'ERROR': logging.ERROR,
+                  'CRITICAL': logging.CRITICAL}
+
+        # 必须设置,这里如果不显示设置,默认过滤掉warning之前的所有级别的信息
+        self.logger.setLevel(LEVELS['DEBUG'])
+
+        # 仅为matplotlib设置更高的日志等级(ERROR)
+        matplotlib_logger = logging.getLogger('matplotlib')
+        matplotlib_logger.setLevel(logging.ERROR)
+
+        # 日志输出格式
+        self.formatter = logging.Formatter(
+            '%(asctime)s - %(filename)s - %(levelname)s - %(message)s - %(funcName)s')  # 输出日志格式
+
+        # 创建一个handler, 向文件logname输出日志信息
+        # fh = logging.FileHandler(self.logname, 'a', encoding='utf-8')
+        # midnight:表示日志文件再每天半夜时分滚动
+        # interval: 间隔单位的个数,指等待多少个when的时间后 Logger会自动重建新闻继续进行日志记录
+        # backupCount:表示日志文件的保留个数,假如为30,保留最近30天的日志文件
+        # fh = logging.handlers.TimedRotatingFileHandler(self.getLogName(), when='midnight', interval=1, backupCount=30, encoding='utf-8')
+        # fh.suffix = "%Y-%m-%d"
+        # # fh.extMatch = r"^\d{4}-\d{2}-\d{2}"
+        # # 设置日志等级
+        # fh.setLevel(LEVELS['INFO'])
+        # # 设置handler的格式对象
+        # fh.setFormatter(self.formatter)
+        filename = self.getLogName()
+        dr_fh = DailyRotatingFileHandler(filename, logs_name, backupCount=300, encoding='utf-8')
+        dr_fh.setFormatter(self.formatter)
+        # 将handler增加到logger中
+        self.logger.addHandler(dr_fh)
+
+        # 创建一个StreamHandler,用于输出到控制台
+        ch = logging.StreamHandler()
+        ch.setLevel(LEVELS['INFO'])
+        ch.setFormatter(self.formatter)
+        self.logger.addHandler(ch)
+
+        # # 关闭打开的文件
+        dr_fh.close()
+
+    def getLogName(self):
+        # log_path是存放日志的路径
+        lib_path = Path('/syjy/logs')
+        self.logger.info("日志输出路径为:{}".format(lib_path))
+        # 如果不存在这个logs文件夹,就自动创建一个
+        if not os.path.exists(lib_path):
+            os.mkdir(lib_path)
+        return lib_path / 'algorithm_platform_link.log'
+
+
+
+if __name__ == "__main__":
+    logger = Log(logs_name='xxx')
+    logger.info("this is info")
+    logger.debug("this is debug")
+    logger.error("this is error")
+    logger.warning("this is warning")
+    logger.critical("critical")

+ 39 - 30
data_processing/data_operation/data_nwp_ftp.py

@@ -11,10 +11,11 @@ import pandas as pd
 from pytz import timezone
 from flask import Flask,request,jsonify
 import time, datetime, os, traceback, re
-import logging, zipfile, tempfile, shutil, fnmatch
+import zipfile, tempfile, shutil, fnmatch
 from common.database_dml import insert_data_into_mongo
 from apscheduler.schedulers.background import BackgroundScheduler
-
+from common.logs import Log
+logger = Log('data-processing').logger
 
 app = Flask('data_nwp_ftp——service')
 
@@ -27,10 +28,7 @@ def update_thread():
 def start_jobs():
     scheduler = BackgroundScheduler()
     scheduler.configure({'timezone': timezone("Asia/Shanghai")})
-    scheduler.add_job(func=download_zip_files_from_ftp, args=['00'], trigger="cron", hour=0, minute=0)
-    scheduler.add_job(func=download_zip_files_from_ftp, args=['06'], trigger="cron", hour=6, minute=0)
-    scheduler.add_job(func=download_zip_files_from_ftp, args=['12'], trigger="cron", hour=12, minute=0)
-    scheduler.add_job(func=download_zip_files_from_ftp, args=['18'], trigger="cron", hour=18, minute=0)
+    scheduler.add_job(func=download_zip_files_from_ftp, trigger="interval", seconds=300)
     scheduler.start()
 
 def match_date(date, filename):
@@ -69,10 +67,20 @@ def delete_zip_files(date):
                     except OSError as e:
                         print(f"Error deleting file {csv_file_path}: {e.strerror}")
 
-def download_zip_files_from_ftp(hour):
-    date = datetime.datetime.now().strftime("%Y%m%d")
-    date_2 = (datetime.datetime.now() - timedelta(days=2)).strftime("%Y%m%d")
-    host, moment  = 'xxl', hour
+def download_zip_files_from_ftp(moment=None):
+    now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+    date = now.strftime("%Y%m%d")
+    date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
+    if moment is None:
+        if now.hour >= 18:
+            moment = '18'
+        elif now.hour >= 12:
+            moment = '12'
+        elif now.hour >= 6:
+            moment = '06'
+        else:
+            moment = '00'
+    host = 'xxl'
     ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
     zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
     zip_file_path = []
@@ -93,10 +101,10 @@ def download_zip_files_from_ftp(hour):
                 local_file_path = os.path.join(local_dir, file_name)
 
                 with open(local_file_path, 'wb') as local_file:
-                    logging.info(f"Downloading {remote_file_path} to {local_file_path}")
+                    logger.info(f"Downloading {remote_file_path} to {local_file_path}")
                     ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
 
-                logging.info(f"Downloaded {file_name}")
+                logger.info(f"Downloaded {file_name}")
                 zip_file_path.append(local_file_path)
     # 解压 ZIP 文件到临时目录
     for zip_file_p in zip_file_path:
@@ -136,7 +144,7 @@ def select_file_to_mongo(args):
             df = select_dx_from_nwp(weather_power, args)
             insert_data_into_mongo(df, args)
         else:
-            logging.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
+            logger.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
     else:
         if csv_weather_path:
             weather = select_dx_from_nwp(weather, args)
@@ -144,7 +152,7 @@ def select_file_to_mongo(args):
             df = select_dx_from_nwp(weather, args)
             insert_data_into_mongo(df, args)
         else:
-            logging.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
+            logger.info(f"CSV 文件 {csv_file_weather} 在目标目录 {farmId} 中未找到")
 
 
 def select_dx_from_nwp(df, args):
@@ -177,11 +185,12 @@ def get_nwp_from_ftp():
     start_time = time.time()
     result = {}
     success = 0
-    print("Program starts execution!")
+    args = {}
+    # print("data_nwp_ftp starts execution!")
     try:
         args = request.values.to_dict()
         # 1. 获取参数:日期,数据源,时刻,D0-9,场站ID,存储的 mongo 和表
-        print('args', args)
+        # print('args', args)
         logger.info(args)
         # 2. 连接FTP,从FTP服务器中获取指定参数的压缩文件(定时任务)
         # 3. 解压压缩文件,将其存储到mongo中
@@ -191,32 +200,32 @@ def get_nwp_from_ftp():
         my_exception = traceback.format_exc()
         my_exception.replace("\n", "\t")
         result['msg'] = my_exception
+        logger.info("生产,获取原始nwp出错:{}".format(my_exception))
     end_time = time.time()
     result['success'] = success
     result['args'] = args
     result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
     result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
-    print("Program execution ends!")
+    # print("Program execution ends!")
     return result
 
 
 if __name__ == "__main__":
     print("Program starts execution!")
-    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-    logger = logging.getLogger("data_nwp_ftp")
     from waitress import serve
     update_thread() #定时任务开启
-    current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
-    current_hour = current_time.hour
-    if current_hour >= 18:
-        current_hour = '18'
-    elif current_hour >= 12:
-        current_hour = '12'
-    elif current_hour >= 6:
-        current_hour = '06'
-    else:
-        current_hour = '00'
-    threading.Thread(target=download_zip_files_from_ftp, args=(current_hour,)).start()
+    # current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+    # current_hour = current_time.hour
+    # if current_hour >= 18:
+    #     current_hour = '18'
+    # elif current_hour >= 12:
+    #     current_hour = '12'
+    # elif current_hour >= 6:
+    #     current_hour = '06'
+    # else:
+    #     current_hour = '00'
+    current_hour = '06' # 默认首次运行下载06时刻的zip
+    threading.Thread(target=download_zip_files_from_ftp, kwargs={'moment': current_hour}).start()
     serve(app, host="0.0.0.0", port=10102)
     print("server start!")