David 3 months ago
parent
commit
9083c026e6

+ 2 - 17
data_processing/data_operation/data_nwp_ftp.py

@@ -91,21 +91,6 @@ def get_moment_next(schedule_dt=False):
         moment = '06'
     return date, moment
 
-def get_moment(schedule_dt=False):
-    if schedule_dt:
-        now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
-    else:
-        now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
-    if now.hour >= 18:
-        moment = '18'
-    elif now.hour >= 12:
-        moment = '12'
-    elif now.hour >= 6:
-        moment = '06'
-    else:
-        moment = '00'
-    return moment
-
 def download_zip_files_from_ftp(moment=None):
     now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
     date = now.strftime("%Y%m%d")
@@ -170,7 +155,7 @@ def select_file_to_mongo(args):
             if csv_weather_path or csv_power_path:
                 break
     if csv_weather_path is False:
-        raise ValueError("获取nwp文件异常:找不到文件")
+        raise ValueError("获取nwp文件异常:找不到场站 {} nwp文件".format(farmId))
     # 使用 pandas 读取 CSV 文件
     weather = pd.read_csv(csv_weather_path)
     power = pd.read_csv(csv_power_path) if csv_power_path else None
@@ -184,7 +169,7 @@ def select_file_to_mongo(args):
         else:
             df = select_dx_from_nwp(weather, args)
             insert_data_into_mongo(df, args)
-            logger.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
+            logger.info(f"CSV 文件 {csv_file_power} 在目标目录 {farmId} 中未找到")
     else:
         if csv_weather_path:
             weather = select_dx_from_nwp(weather, args)

+ 3 - 19
data_processing/data_operation/pre_data_ftp.py

@@ -75,7 +75,7 @@ def zip_temp_file(df, args):
     temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
     date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
     modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
-    csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, date, moment)
+    csv_file = 'jy_{}.{}.{}_{}_{}{}.csv'.format(modeler, model, version, farmId, date, moment)
     csv_path = os.path.join(temp_dir, farmId, csv_file)
     os.makedirs(os.path.dirname(csv_path), exist_ok=True)
     df.to_csv(csv_path, index=False)
@@ -115,7 +115,7 @@ def sftp_upload(sftp, local_file_path, remote_file_path):
     except Exception as e:
         logger.info(f"Failed to upload {local_file_path}. Error: {e}")
 
-def upload_sftp(zip_path, zip_file, args, cdq, dq, zq):
+def upload_sftp(zip_path, zip_file, args):
     ftp_host, ftp_port_dev, ftp_port_prd, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port_dev'], ftp_params['port_prd'], args['user'], ftp_params[args['user']]['password']
     sftp, ssh = sftp_connect(ftp_host, ftp_port_dev, ftp_user, ftp_pass)
     if sftp and ssh:
@@ -123,18 +123,6 @@ def upload_sftp(zip_path, zip_file, args, cdq, dq, zq):
         sftp_upload(sftp, zip_path, '/' + ftp_params[args['user']]['modeler'] + '/'+zip_file)
     # 关闭连接
     sftp_close(sftp, ssh)
-    if cdq:
-        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
-        sftp_upload(sftp, zip_path, '/cdq' + '/'+zip_file)
-        sftp_close(sftp, ssh)
-    if dq:
-        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
-        sftp_upload(sftp, zip_path, '/dq' + '/' + zip_file)
-        sftp_close(sftp, ssh)
-    if zq:
-        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
-        sftp_upload(sftp, zip_path, '/zq' + '/' + zip_file)
-        sftp_close(sftp, ssh)
     shutil.rmtree(os.path.dirname(zip_path))
 
 def upload_ftp(zip_path, zip_file, args):
@@ -176,15 +164,11 @@ def get_nwp_from_ftp():
         df['date_time'] = pd.to_datetime(df['date_time'])
         dfs = df.groupby('farm_id')
         for farm_id, df in dfs:
-            if {'cdq', 'dq', 'zq'}.issubset(set(df.columns.tolist())):
-                cdq, dq, zq = df.loc[df.index[0], ['cdq', 'dq', 'zq']]
-            else:
-                cdq, dq, zq = 0, 0, 0
             df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
             # 2. 将预测结果保存成csv临时文件,命名压缩
             zip_path, zip_file = zip_temp_file(df, args)
             # 3. 上传到指定的FTP服务器中
-            upload_sftp(zip_path, zip_file, args, cdq, dq, zq)
+            upload_sftp(zip_path, zip_file, args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()

+ 28 - 25
data_processing/data_operation/pre_prod_ftp.py

@@ -17,7 +17,7 @@
 其中,通过算法工程师 / 模型 版本从测试FTP上获取场站编码集合的步骤如下:
 (1)遍历算法工程师下的 模型 版本 zip文件,根据当前时刻,从FTP上下载, 解压
 (2)根据 模型 版本 下的 场站编码 依次获取,并组装成这个算法工程师的场站编码集合
-(3)合并所有算法工程的场站编码集合,形成类型下的场站编码集合
+(3)合并所有算法工程的场站编码集合,形成类型下的场站编码集合
 3. 压缩成类型(超短期 短期 中期)三个zip文件,上传生产FTP
 """
 import requests
@@ -46,12 +46,13 @@ SFTP_PORT = 2022
 SFTP_USERNAME = 'liudawei'
 SFTP_PASSWORD = 'liudawei@123'
 # 在原配置部分添加以下配置
-DEST_SFTP_HOST = 'dest_sftp.example.com'
-DEST_SFTP_PORT = 22
-DEST_SFTP_USERNAME = 'dest_username'
-DEST_SFTP_PASSWORD = 'dest_password'
-DEFAULT_TARGET_DIR = 'cdq'  # 默认上传目录
+DEST_SFTP_HOST = '192.168.1.33'
+DEST_SFTP_PORT = 2025
+DEST_SFTP_USERNAME = 'liudawei'
+DEST_SFTP_PASSWORD = 'liudawei@123'
 
+from common.logs import Log
+logger = Log('pre_prod_ftp').logger
 
 def fetch_station_records(model_type, is_current=1):
     """
@@ -136,7 +137,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
             try:
                 sftp.chdir(remote_base)
             except FileNotFoundError:
-                print(f"工程师目录不存在: {remote_base}")
+                logger.info(f"工程师目录不存在: {remote_base}")
                 continue
 
             for model_version in mappings[engineer]:
@@ -146,16 +147,16 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
 
                 try:
                     sftp.get(remote_path, local_path)
-                    print(f"下载成功: {remote_path} -> {local_path}")
+                    logger.info(f"下载成功: {remote_path} -> {local_path}")
                 except Exception as e:
                     print(f"文件下载失败 {remote_path}: {str(e)}")
 
     except paramiko.AuthenticationException:
-        print("认证失败,请检查用户名和密码")
+        logger.info("认证失败,请检查用户名和密码")
     except paramiko.SSHException as e:
-        print(f"SSH连接异常: {str(e)}")
+        logger.info(f"SSH连接异常: {str(e)}")
     except Exception as e:
-        print(f"未知错误: {str(e)}")
+        logger.info(f"未知错误: {str(e)}")
     finally:
         # 遍历到最后一个中短期,确保连接关闭
         if model_type == 'zcq':
@@ -192,11 +193,11 @@ def upload_to_sftp(local_path: str, target_dir: str) -> bool:
         remote_filename = os.path.basename(local_path)
         remote_path = f"{target_dir}/{remote_filename}"
         sftp.put(local_path, remote_path)
-        print(f"[SUCCESS] 上传完成: {remote_path}")
+        logger.info(f"[SUCCESS] 上传完成: {remote_path}")
         return True
 
     except Exception as e:
-        print(f"[ERROR] 上传失败: {str(e)}")
+        logger.info(f"[ERROR] 上传失败: {str(e)}")
         return False
     finally:
         # 确保资源释放
@@ -225,7 +226,7 @@ def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
                     with zipfile.ZipFile(zip_path, 'r') as zf:
                         zf.extractall(temp_extract)
                 except zipfile.BadZipFile:
-                    print(f"无效的ZIP文件: {zip_path}")
+                    logger.info(f"无效的ZIP文件: {zip_path}")
                     continue
 
                 # 收集场站目录
@@ -237,7 +238,7 @@ def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
 
                             if not os.path.exists(dest):
                                 shutil.copytree(src, dest)
-                                print(f"已收集场站: {dir_name}")
+                                logger.info(f"已收集场站: {dir_name}")
 
 
 def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
@@ -251,12 +252,14 @@ def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str)
     Returns:
         生成的ZIP文件完整路径
     """
+    ftp_dir = os.path.dirname(os.path.dirname(__file__))
+    output_dir = os.path.join(ftp_dir, 'cache', 'ftp')
     # 确保缓存目录存在
-    os.makedirs('../cache/ftp', exist_ok=True)
+    os.makedirs(output_dir, exist_ok=True)
 
     # 构造标准化文件名
     zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
-    output_path = os.path.join('../cache/ftp', zip_filename)
+    output_path = os.path.join(output_dir, zip_filename)
 
     try:
         with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
@@ -265,10 +268,10 @@ def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str)
                     file_path = os.path.join(root, file)
                     arcname = os.path.relpath(file_path, final_collect_dir)
                     zf.write(file_path, arcname)
-        print(f"[SUCCESS] ZIP创建成功: {output_path}")
+        logger.info(f"[SUCCESS] ZIP创建成功: {output_path}")
         return output_path
     except Exception as e:
-        print(f"[ERROR] 创建ZIP失败: {str(e)}")
+        logger.info(f"[ERROR] 创建ZIP失败: {str(e)}")
         raise
 
 
@@ -277,9 +280,9 @@ def clean_up_file(file_path: str) -> None:
     try:
         if os.path.exists(file_path):
             os.remove(file_path)
-            print(f"[CLEANUP] 已删除本地文件: {file_path}")
+            logger.info(f"[CLEANUP] 已删除本地文件: {file_path}")
     except Exception as e:
-        print(f"[WARNING] 文件删除失败: {str(e)}")
+        logger.info(f"[WARNING] 文件删除失败: {str(e)}")
 
 def prod_data_handler(mappings, model_type):
     # 创建临时工作目录
@@ -290,7 +293,7 @@ def prod_data_handler(mappings, model_type):
         # 计算目标时间
         target_time = get_next_target_time()
         datetime_str = target_time.strftime("%Y%m%d%H")
-        print(f"目标时间: {datetime_str}")
+        logger.info(f"目标时间: {datetime_str}")
         datetime_str = '2025012412'
 
         # 下载文件
@@ -307,7 +310,7 @@ def prod_data_handler(mappings, model_type):
             # 步骤3: 上传成功后清理
             clean_up_file(zip_path)
         else:
-            print("[WARNING] 上传未成功,保留本地文件")
+            logger.info("[WARNING] 上传未成功,保留本地文件")
 
 
 from apscheduler.schedulers.blocking import BlockingScheduler
@@ -336,10 +339,10 @@ def configure_scheduler():
 
     # 启动调度器
     try:
-        print("⏰ 定时任务已启动,按 Ctrl+C 退出")
+        logger.info("⏰ pre_prod_ftp:生产ftp定时任务已启动,按 Ctrl+C 退出")
         scheduler.start()
     except (KeyboardInterrupt, SystemExit):
-        print("⏹️ 定时任务已停止")
+        logger.info("⏹️ 定时任务已停止")
 
 if __name__ == "__main__":
     configure_scheduler()