Преглед на файлове

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei преди 5 месеца
родител
ревизия
7d07a2e39c
променени са 3 файла, в които са добавени 64 реда и са изтрити 30 реда
  1. 1 1
      common/logs.py
  2. 49 23
      data_processing/data_operation/data_nwp_ftp.py
  3. 14 6
      data_processing/data_operation/pre_data_ftp.py

+ 1 - 1
common/logs.py

@@ -147,7 +147,7 @@ class Log(object):
                   'CRITICAL': logging.CRITICAL}
 
         # 必须设置,这里如果不显示设置,默认过滤掉warning之前的所有级别的信息
-        self.logger.setLevel(LEVELS['DEBUG'])
+        self.logger.setLevel(LEVELS['INFO'])
 
         # 仅为matplotlib设置更高的日志等级(ERROR)
         matplotlib_logger = logging.getLogger('matplotlib')

+ 49 - 23
data_processing/data_operation/data_nwp_ftp.py

@@ -28,7 +28,7 @@ def update_thread():
 def start_jobs():
     scheduler = BackgroundScheduler()
     scheduler.configure({'timezone': timezone("Asia/Shanghai")})
-    scheduler.add_job(func=download_zip_files_from_ftp, trigger="interval", seconds=300)
+    scheduler.add_job(func=download_zip_files_from_ftp, trigger="interval", seconds=900)
     scheduler.start()
 
 def match_date(date, filename):
@@ -67,6 +67,29 @@ def delete_zip_files(date):
                     except OSError as e:
                         print(f"Error deleting file {csv_file_path}: {e.strerror}")
 
+def get_moment_next(schedule_dt=False):
+    if schedule_dt:
+        now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
+    else:
+        now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+    if now.hour == 18:
+        moment = '18'
+    elif now.hour > 18:
+        moment = '00'
+    elif now.hour == 12:
+        moment = '12'
+    elif now.hour > 12:
+        moment = '18'
+    elif now.hour == 6:
+        moment = '06'
+    elif now.hour > 6:
+        moment = '12'
+    elif 2 >= now.hour >= 0:
+        moment = '00'
+    else:
+        moment = '06'
+    return moment
+
 def get_moment(schedule_dt=False):
     if schedule_dt:
         now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
@@ -87,34 +110,36 @@ def download_zip_files_from_ftp(moment=None):
     date = now.strftime("%Y%m%d")
     date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
     if moment is None:
-        moment = get_moment()
+        moment = get_moment_next()
     host = 'xxl'
     ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
     zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
     zip_file_path = []
-    if len(glob.glob(os.path.join(local_dir, zip_extension))) == 0:
-        # 连接到FTP服务器
-        with FTP(ftp_host) as ftp:
-            ftp.login(user=ftp_user, passwd=ftp_password)
+    # 连接到FTP服务器
+    with FTP(ftp_host) as ftp:
+        ftp.login(user=ftp_user, passwd=ftp_password)
 
-            # 切换到远程目录
-            ftp.cwd(remote_dir)
+        # 切换到远程目录
+        ftp.cwd(remote_dir)
 
-            # 获取远程目录中的文件和目录列表
-            files = ftp.nlst()
+        # 获取远程目录中的文件和目录列表
+        files = ftp.nlst()
 
-            # 遍历文件列表,找到ZIP文件并下载
-            for file_name in files:
-                if fnmatch.fnmatch(file_name, zip_extension):
-                    remote_file_path = os.path.join(remote_dir, file_name)
-                    local_file_path = os.path.join(local_dir, file_name)
+        # 遍历文件列表,找到ZIP文件并下载
+        for file_name in files:
+            if fnmatch.fnmatch(file_name, zip_extension):
+                remote_file_path = os.path.join(remote_dir, file_name)
+                local_file_path = os.path.join(local_dir, file_name)
 
-                    with open(local_file_path, 'wb') as local_file:
-                        logger.info(f"Downloading {remote_file_path} to {local_file_path}")
-                        ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
+                if os.path.isfile(local_file_path):
+                    continue
 
-                    logger.info(f"Downloaded {file_name}")
-                    zip_file_path.append(local_file_path)
+                with open(local_file_path, 'wb') as local_file:
+                    logger.info(f"Downloading {remote_file_path} to {local_file_path}")
+                    ftp.retrbinary(f'RETR {remote_file_path}', local_file.write)
+
+                logger.info(f"Downloaded {file_name}")
+                zip_file_path.append(local_file_path)
     # 解压 ZIP 文件到临时目录
     for zip_file_p in zip_file_path:
         with zipfile.ZipFile(zip_file_p, 'r') as zip_ref:
@@ -123,7 +148,7 @@ def download_zip_files_from_ftp(moment=None):
     delete_zip_files(date_2)
 
 def select_file_to_mongo(args):
-    date, moment, farmId, isDq = args['dt'], get_moment(args['dt']), args['farmId'], args['isDq']
+    date, moment, farmId, isDq = args['dt'], get_moment_next(args['dt']), args['farmId'], args['isDq']
     date = datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S').strftime("%Y%m%d")
     csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
     csv_file_weather = csv_file_format.replace('*', 'weather')
@@ -143,9 +168,10 @@ def select_file_to_mongo(args):
                     logger.info("找到power:{}".format(csv_power_path))
             if csv_weather_path or csv_power_path:
                 break
-
+    if csv_weather_path is False:
+        raise ValueError("获取nwp文件异常:找不到文件")
     # 使用 pandas 读取 CSV 文件
-    weather = pd.read_csv(csv_weather_path) if csv_weather_path else ValueError("获取nwp文件异常")
+    weather = pd.read_csv(csv_weather_path)
     power = pd.read_csv(csv_power_path) if csv_power_path else None
     if isDq:
         if csv_weather_path and csv_power_path:

+ 14 - 6
data_processing/data_operation/pre_data_ftp.py

@@ -33,19 +33,27 @@ ftp_params = {
     }
 }
 
-def get_moment(schedule_dt=False):
+def get_moment_next(schedule_dt=False):
     if schedule_dt:
         now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
     else:
         now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
-    if now.hour >= 18:
+    if now.hour == 18:
         moment = '18'
-    elif now.hour >= 12:
+    elif now.hour > 18:
+        moment = '00'
+    elif now.hour == 12:
         moment = '12'
-    elif now.hour >= 6:
+    elif now.hour > 12:
+        moment = '18'
+    elif now.hour == 6:
         moment = '06'
-    else:
+    elif now.hour > 6:
+        moment = '12'
+    elif 2 >= now.hour >= 0:
         moment = '00'
+    else:
+        moment = '06'
     return moment
 
 def zip_temp_file(df, args):
@@ -59,7 +67,7 @@ def zip_temp_file(df, args):
     temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
     current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
     dt = current_time.strftime('%Y%m%d')
-    moment = get_moment() if args.get('dt') is None else get_moment(args.get('dt'))
+    moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
     modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
     csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, dt, moment)
     csv_path = os.path.join(temp_dir, farmId, csv_file)