Selaa lähdekoodia

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei 3 kuukautta sitten
vanhempi
commit
68a80f2a45

+ 2 - 17
data_processing/data_operation/data_nwp_ftp.py

@@ -91,21 +91,6 @@ def get_moment_next(schedule_dt=False):
         moment = '06'
     return date, moment
 
-def get_moment(schedule_dt=False):
-    if schedule_dt:
-        now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
-    else:
-        now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
-    if now.hour >= 18:
-        moment = '18'
-    elif now.hour >= 12:
-        moment = '12'
-    elif now.hour >= 6:
-        moment = '06'
-    else:
-        moment = '00'
-    return moment
-
 def download_zip_files_from_ftp(moment=None):
     now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
     date = now.strftime("%Y%m%d")
@@ -170,7 +155,7 @@ def select_file_to_mongo(args):
             if csv_weather_path or csv_power_path:
                 break
     if csv_weather_path is False:
-        raise ValueError("获取nwp文件异常:找不到文件")
+        raise ValueError("获取nwp文件异常:找不到场站 {} nwp文件".format(farmId))
     # 使用 pandas 读取 CSV 文件
     weather = pd.read_csv(csv_weather_path)
     power = pd.read_csv(csv_power_path) if csv_power_path else None
@@ -184,7 +169,7 @@ def select_file_to_mongo(args):
         else:
             df = select_dx_from_nwp(weather, args)
             insert_data_into_mongo(df, args)
-            logger.info(f"CSV 文件 {csv_file_power} 或 {csv_file_weather} 在目标目录 {farmId} 中未找到")
+            logger.info(f"CSV 文件 {csv_file_power} 在目标目录 {farmId} 中未找到")
     else:
         if csv_weather_path:
             weather = select_dx_from_nwp(weather, args)

+ 3 - 19
data_processing/data_operation/pre_data_ftp.py

@@ -75,7 +75,7 @@ def zip_temp_file(df, args):
     temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
     date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
     modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
-    csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, date, moment)
+    csv_file = 'jy_{}.{}.{}_{}_{}{}.csv'.format(modeler, model, version, farmId, date, moment)
     csv_path = os.path.join(temp_dir, farmId, csv_file)
     os.makedirs(os.path.dirname(csv_path), exist_ok=True)
     df.to_csv(csv_path, index=False)
@@ -115,7 +115,7 @@ def sftp_upload(sftp, local_file_path, remote_file_path):
     except Exception as e:
         logger.info(f"Failed to upload {local_file_path}. Error: {e}")
 
-def upload_sftp(zip_path, zip_file, args, cdq, dq, zq):
+def upload_sftp(zip_path, zip_file, args):
     ftp_host, ftp_port_dev, ftp_port_prd, ftp_user, ftp_pass = ftp_params['host'], ftp_params['port_dev'], ftp_params['port_prd'], args['user'], ftp_params[args['user']]['password']
     sftp, ssh = sftp_connect(ftp_host, ftp_port_dev, ftp_user, ftp_pass)
     if sftp and ssh:
@@ -123,18 +123,6 @@ def upload_sftp(zip_path, zip_file, args, cdq, dq, zq):
         sftp_upload(sftp, zip_path, '/' + ftp_params[args['user']]['modeler'] + '/'+zip_file)
     # 关闭连接
     sftp_close(sftp, ssh)
-    if cdq:
-        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
-        sftp_upload(sftp, zip_path, '/cdq' + '/'+zip_file)
-        sftp_close(sftp, ssh)
-    if dq:
-        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
-        sftp_upload(sftp, zip_path, '/dq' + '/' + zip_file)
-        sftp_close(sftp, ssh)
-    if zq:
-        sftp, ssh = sftp_connect(ftp_host, ftp_port_prd, ftp_user, ftp_pass)
-        sftp_upload(sftp, zip_path, '/zq' + '/' + zip_file)
-        sftp_close(sftp, ssh)
     shutil.rmtree(os.path.dirname(zip_path))
 
 def upload_ftp(zip_path, zip_file, args):
@@ -176,15 +164,11 @@ def get_nwp_from_ftp():
         df['date_time'] = pd.to_datetime(df['date_time'])
         dfs = df.groupby('farm_id')
         for farm_id, df in dfs:
-            if {'cdq', 'dq', 'zq'}.issubset(set(df.columns.tolist())):
-                cdq, dq, zq = df.loc[df.index[0], ['cdq', 'dq', 'zq']]
-            else:
-                cdq, dq, zq = 0, 0, 0
             df = df.sort_values(by='date_time')[['farm_id', 'date_time', 'power_forecast']]
             # 2. 将预测结果保存成csv临时文件,命名压缩
             zip_path, zip_file = zip_temp_file(df, args)
             # 3. 上传到指定的FTP服务器中
-            upload_sftp(zip_path, zip_file, args, cdq, dq, zq)
+            upload_sftp(zip_path, zip_file, args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()

+ 272 - 9
data_processing/data_operation/pre_prod_ftp.py

@@ -17,23 +17,43 @@
 其中,通过算法工程师 / 模型 版本从测试FTP上获取场站编码集合的步骤如下:
 (1)遍历算法工程师下的 模型 版本 zip文件,根据当前时刻,从FTP上下载, 解压
 (2)根据 模型 版本 下的 场站编码 依次获取,并组装成这个算法工程师的场站编码集合
-(3)合并所有算法工程的场站编码集合,形成类型下的场站编码集合
+(3)合并所有算法工程的场站编码集合,形成类型下的场站编码集合
 3. 压缩成类型(超短期 短期 中期)三个zip文件,上传生产FTP
 """
-from collections import defaultdict
 import requests
 import json
 import os
 import paramiko
 import zipfile
-from io import BytesIO
-from datetime import datetime
+from datetime import datetime, timedelta
+from typing import Optional
+import shutil
+import tempfile
 api_url = 'http://itil.jiayuepowertech.com:9958/itil/api/stationModelConfig'
 nick_name = {
     '0': 'seer',
     '1': 'koi',
     '2': 'lucky'
 }
+model_type_dict = {
+    'cdq': '0',
+    'dq': '1',
+    'czq': '2'
+}
+# 配置信息
+SFTP_HOST = '192.168.1.33'
+SFTP_PORT = 2022
+SFTP_USERNAME = 'liudawei'
+SFTP_PASSWORD = 'liudawei@123'
+# 在原配置部分添加以下配置
+DEST_SFTP_HOST = '192.168.1.33'
+DEST_SFTP_PORT = 2025
+DEST_SFTP_USERNAME = 'liudawei'
+DEST_SFTP_PASSWORD = 'liudawei@123'
+
+from common.logs import Log
+logger = Log('pre_prod_ftp').logger
+
 def fetch_station_records(model_type, is_current=1):
     """
     调用接口获取场站记录
@@ -42,7 +62,7 @@ def fetch_station_records(model_type, is_current=1):
     :return: 场站记录列表或错误信息
     """
     params = {
-        "paramModelType": str(model_type),
+        "paramModelType": model_type_dict[str(model_type)],
         "paramIsCurrent": str(is_current)  # 适配接口参数格式
     }
 
@@ -56,7 +76,7 @@ def fetch_station_records(model_type, is_current=1):
         return {"error": "接口返回非JSON数据"}
 
 
-def process_station_data(api_data):
+def model_station_handler(api_data):
     """
     处理接口数据,生成三级映射关系
     :param api_data: 接口返回的原始数据(假设为字典列表)
@@ -77,9 +97,252 @@ def process_station_data(api_data):
     return mapping
 
 
+def get_next_target_time(current_time=None):
+    """获取下一个目标时刻"""
+    if current_time is None:
+        current_time = datetime.now()
+
+    target_hours = [0, 6, 12, 18]
+    current_hour = current_time.hour
+
+    for hour in sorted(target_hours):
+        if current_hour < hour:
+            return current_time.replace(hour=hour, minute=0, second=0, microsecond=0)
+
+    # 如果当前时间超过所有目标小时,使用次日0点
+    return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
+
+
+def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
+    """
+    封装SFTP连接和文件下载的完整流程
+    :param mappings: 文件映射配置
+    :param datetime_str: 日期时间字符串,用于文件名
+    :param local_temp_dir: 本地临时目录路径
+    """
+    transport = None
+    sftp = None
+    try:
+        # 创建SSH传输通道
+        transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
+        transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
+
+        # 创建SFTP客户端
+        sftp = paramiko.SFTPClient.from_transport(transport)
+
+        # 执行文件下载
+        for engineer in mappings:
+            datetime_str = datetime_str if engineer == 'koi' else 2025012000
+            remote_base = f"/{engineer}/"
+            try:
+                sftp.chdir(remote_base)
+            except FileNotFoundError:
+                logger.info(f"工程师目录不存在: {remote_base}")
+                continue
+
+            for model_version in mappings[engineer]:
+                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip"
+                remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
+                local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
+
+                try:
+                    sftp.get(remote_path, local_path)
+                    logger.info(f"下载成功: {remote_path} -> {local_path}")
+                except Exception as e:
+                    print(f"文件下载失败 {remote_path}: {str(e)}")
+
+    except paramiko.AuthenticationException:
+        logger.info("认证失败,请检查用户名和密码")
+    except paramiko.SSHException as e:
+        logger.info(f"SSH连接异常: {str(e)}")
+    except Exception as e:
+        logger.info(f"未知错误: {str(e)}")
+    finally:
+        # 遍历到最后一个中短期,确保连接关闭
+        if model_type == 'zcq':
+            if sftp:
+                sftp.close()
+            if transport and transport.is_active():
+                transport.close()
+
+
+def upload_to_sftp(local_path: str, target_dir: str) -> bool:
+    """上传文件到SFTP服务器
+
+    Args:
+        local_path: 本地文件路径
+        target_dir: 远程目标目录
+
+    Returns:
+        上传是否成功 (True/False)
+    """
+    transport: Optional[paramiko.Transport] = None
+    sftp: Optional[paramiko.SFTPClient] = None
+
+    try:
+        # 验证本地文件存在
+        if not os.path.isfile(local_path):
+            raise FileNotFoundError(f"本地文件不存在: {local_path}")
+
+        # 创建SFTP连接
+        transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
+        transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
+        sftp = paramiko.SFTPClient.from_transport(transport)
+
+        # 执行上传
+        remote_filename = os.path.basename(local_path)
+        remote_path = f"{target_dir}/{remote_filename}"
+        sftp.put(local_path, remote_path)
+        logger.info(f"[SUCCESS] 上传完成: {remote_path}")
+        return True
+
+    except Exception as e:
+        logger.info(f"[ERROR] 上传失败: {str(e)}")
+        return False
+    finally:
+        # 确保资源释放
+        if sftp:
+            sftp.close()
+        if transport and transport.is_active():
+            transport.close()
+
+
+def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
+    """处理所有下载的ZIP文件并收集场站目录"""
+    for engineer in mappings:
+        datetime_str = datetime_str if engineer == 'koi' else 2025012000
+        for model_version in mappings[engineer]:
+            target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
+            zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
+            station_codes = mappings[engineer][model_version]
 
+            if not os.path.exists(zip_path):
+                continue
+
+            # 创建临时解压目录
+            with tempfile.TemporaryDirectory() as temp_extract:
+                # 解压ZIP文件
+                try:
+                    with zipfile.ZipFile(zip_path, 'r') as zf:
+                        zf.extractall(temp_extract)
+                except zipfile.BadZipFile:
+                    logger.info(f"无效的ZIP文件: {zip_path}")
+                    continue
+
+                # 收集场站目录
+                for root, dirs, files in os.walk(temp_extract):
+                    for dir_name in dirs:
+                        if dir_name in station_codes:
+                            src = os.path.join(root, dir_name)
+                            dest = os.path.join(final_collect_dir, dir_name)
+
+                            if not os.path.exists(dest):
+                                shutil.copytree(src, dest)
+                                logger.info(f"已收集场站: {dir_name}")
+
+
+def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
+    """创建ZIP压缩包并返回完整路径
+
+    Args:
+        final_collect_dir: 需要打包的源目录
+        datetime_str: 时间戳字符串
+        model_type: 模型类型标识
+
+    Returns:
+        生成的ZIP文件完整路径
+    """
+    ftp_dir = os.path.dirname(os.path.dirname(__file__))
+    output_dir = os.path.join(ftp_dir, 'cache', 'ftp')
+    # 确保缓存目录存在
+    os.makedirs(output_dir, exist_ok=True)
+
+    # 构造标准化文件名
+    zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
+    output_path = os.path.join(output_dir, zip_filename)
+
+    try:
+        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
+            for root, _, files in os.walk(final_collect_dir):
+                for file in files:
+                    file_path = os.path.join(root, file)
+                    arcname = os.path.relpath(file_path, final_collect_dir)
+                    zf.write(file_path, arcname)
+        logger.info(f"[SUCCESS] ZIP创建成功: {output_path}")
+        return output_path
+    except Exception as e:
+        logger.info(f"[ERROR] 创建ZIP失败: {str(e)}")
+        raise
+
+
+def clean_up_file(file_path: str) -> None:
+    """安全删除本地文件"""
+    try:
+        if os.path.exists(file_path):
+            os.remove(file_path)
+            logger.info(f"[CLEANUP] 已删除本地文件: {file_path}")
+    except Exception as e:
+        logger.info(f"[WARNING] 文件删除失败: {str(e)}")
+
+def prod_data_handler(mappings, model_type):
+    # 创建临时工作目录
+    with tempfile.TemporaryDirectory() as local_temp_dir:
+        final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
+        os.makedirs(final_collect_dir, exist_ok=True)
+
+        # 计算目标时间
+        target_time = get_next_target_time()
+        datetime_str = target_time.strftime("%Y%m%d%H")
+        logger.info(f"目标时间: {datetime_str}")
+        datetime_str = '2025012412'
+
+        # 下载文件
+        download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
+
+        # 处理下载的文件
+        process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
+
+        # 创建最终ZIP
+        zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
+
+        # 上传打包ZIP文件
+        if upload_to_sftp(zip_path, f"/{model_type}"):
+            # 步骤3: 上传成功后清理
+            clean_up_file(zip_path)
+        else:
+            logger.info("[WARNING] 上传未成功,保留本地文件")
+
+
+from apscheduler.schedulers.blocking import BlockingScheduler
+from apscheduler.triggers.cron import CronTrigger
+
+def target_job():
+    """目标方法,实际任务逻辑在此实现"""
+    for model_type in ['cdq', 'dq', 'zcq']:
+        models = fetch_station_records(model_type)
+        mapping = model_station_handler(models['data'])
+        prod_data_handler(mapping, model_type)
+
+def configure_scheduler():
+    # 创建调度器(可设置时区,如 timezone='Asia/Shanghai')
+    scheduler = BlockingScheduler()
+
+    # 配置第一个触发器:处理每个主小时段的前 60 分钟(0-50 分钟)
+    trigger1 = CronTrigger(
+        hour='0,6,12,18',  # 主触发小时
+        minute='0-50/10',  # 每 10 分钟一次,覆盖 00:00-00:50
+        timezone='Asia/Shanghai'  # 按需设置时区
+    )
+
+    # 添加任务到调度器
+    scheduler.add_job(target_job, trigger1)
+
+    # 启动调度器
+    try:
+        logger.info("⏰ pre_prod_ftp:生产ftp定时任务已启动,按 Ctrl+C 退出")
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        logger.info("⏹️ 定时任务已停止")
 
 if __name__ == "__main__":
-    models = fetch_station_records(1)
-    mapping = process_station_data(models['data'])
-    print(mapping)
+    configure_scheduler()

+ 98 - 89
data_processing/data_operation/test.py

@@ -4,9 +4,10 @@
 # @Time      :2025/3/13 14:19
 # @Author    :David
 # @Company: shenyang JY
-
+from enum import Enum
 import paramiko
 from datetime import datetime, timedelta
+from typing import Optional
 import os
 import zipfile
 import shutil
@@ -27,10 +28,9 @@ DEFAULT_TARGET_DIR = 'cdq'  # 默认上传目录
 # 更新后的三级映射
 MAPPINGS = {
     'koi': {('Zone', '1.0'): {'J00645'}},
-    'lucky': {}, 'seer': {('lgb', '1.0'): {'J00001'}}
+    'lucky': {}, 'seer': {('lgb', '1.0'): {'J00578'}}
 }
 
-
 def get_next_target_time(current_time=None):
     """获取下一个目标时刻"""
     if current_time is None:
@@ -47,31 +47,7 @@ def get_next_target_time(current_time=None):
     return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
 
 
-def download_sftp_files(sftp, mappings, datetime_str, local_temp_dir):
-    """下载所有需要的SFTP文件"""
-    for engineer in mappings:
-        remote_base = f"/{engineer}/"  # SFTP根目录下的工程师目录
-        try:
-            sftp.chdir(remote_base)
-        except FileNotFoundError:
-            print(f"工程师目录不存在: {remote_base}")
-            continue
-
-        for model_version in mappings[engineer]:
-            # 构造目标文件名(模型版本已经合并)
-
-            target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
-            remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
-            local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
-
-            try:
-                sftp.get(remote_path, local_path)
-                print(f"下载成功: {remote_path}")
-            except Exception as e:
-                print(f"文件下载失败 {remote_path}: {str(e)}")
-
-
-def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
+def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
     """
     封装SFTP连接和文件下载的完整流程
     :param mappings: 文件映射配置
@@ -90,6 +66,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
 
         # 执行文件下载
         for engineer in mappings:
+            datetime_str = datetime_str if engineer == 'koi' else 2025012000
             remote_base = f"/{engineer}/"
             try:
                 sftp.chdir(remote_base)
@@ -98,7 +75,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
                 continue
 
             for model_version in mappings[engineer]:
-                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
+                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip"
                 remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
                 local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
 
@@ -115,42 +92,49 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
     except Exception as e:
         print(f"未知错误: {str(e)}")
     finally:
-        # 确保连接关闭
-        if sftp:
-            sftp.close()
-        if transport and transport.is_active():
-            transport.close()
+        # 遍历到最后一个中短期,确保连接关闭
+        if model_type == 'zcq':
+            if sftp:
+                sftp.close()
+            if transport and transport.is_active():
+                transport.close()
+
+
+def upload_to_sftp(local_path: str, target_dir: str) -> bool:
+    """上传文件到SFTP服务器
+
+    Args:
+        local_path: 本地文件路径
+        target_dir: 远程目标目录
+
+    Returns:
+        上传是否成功 (True/False)
+    """
+    transport: Optional[paramiko.Transport] = None
+    sftp: Optional[paramiko.SFTPClient] = None
 
-def upload_to_sftp(local_path, target_dir):
-    """上传文件到目标SFTP服务器"""
-    transport = None
-    sftp = None
     try:
-        # 创建新的传输连接
+        # 验证本地文件存在
+        if not os.path.isfile(local_path):
+            raise FileNotFoundError(f"本地文件不存在: {local_path}")
+
+        # 创建SFTP连接
         transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
         transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
         sftp = paramiko.SFTPClient.from_transport(transport)
 
-        # 确保目标目录存在
-        try:
-            sftp.chdir(target_dir)
-        except FileNotFoundError:
-            sftp.mkdir(target_dir)
-            print(f"已创建远程目录: {target_dir}")
-
-        # 构造远程路径
-        filename = os.path.basename(local_path)
-        remote_path = f"{target_dir}/{filename}"
-
         # 执行上传
+        remote_filename = os.path.basename(local_path)
+        remote_path = f"{target_dir}/{remote_filename}"
         sftp.put(local_path, remote_path)
-        print(f"成功上传到: {remote_path}")
+        print(f"[SUCCESS] 上传完成: {remote_path}")
+        return True
 
     except Exception as e:
-        print(f"上传失败: {str(e)}")
-        raise
+        print(f"[ERROR] 上传失败: {str(e)}")
+        return False
     finally:
-        # 确保连接关闭
+        # 确保资源释放
         if sftp:
             sftp.close()
         if transport and transport.is_active():
@@ -159,6 +143,7 @@ def upload_to_sftp(local_path, target_dir):
 def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
     """处理所有下载的ZIP文件并收集场站目录"""
     for engineer in mappings:
+        datetime_str = datetime_str if engineer == 'koi' else 2025012000
         for model_version in mappings[engineer]:
             target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
             zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
@@ -189,51 +174,75 @@ def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
                                 print(f"已收集场站: {dir_name}")
 
 
-def create_final_zip(final_collect_dir, datetime_str, output_path="result.zip"):
-    """创建最终打包的ZIP文件"""
-    with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
-        for root, dirs, files in os.walk(final_collect_dir):
-            for file in files:
-                file_path = os.path.join(root, file)
-                arcname = os.path.relpath(file_path, final_collect_dir)
-                zf.write(file_path, arcname)
-    print(f"最终打包完成: {output_path}")
+def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
+    """创建ZIP压缩包并返回完整路径
 
+    Args:
+        final_collect_dir: 需要打包的源目录
+        datetime_str: 时间戳字符串
+        model_type: 模型类型标识
 
+    Returns:
+        生成的ZIP文件完整路径
+    """
+    # 确保缓存目录存在
+    os.makedirs('../cache/ftp', exist_ok=True)
 
+    # 构造标准化文件名
+    zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
+    output_path = os.path.join('../cache/ftp', zip_filename)
 
-def main():
-    # 创建临时工作目录
-    with tempfile.TemporaryDirectory() as local_temp_dir:
-        final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
-        os.makedirs(final_collect_dir, exist_ok=True)
+    try:
+        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
+            for root, _, files in os.walk(final_collect_dir):
+                for file in files:
+                    file_path = os.path.join(root, file)
+                    arcname = os.path.relpath(file_path, final_collect_dir)
+                    zf.write(file_path, arcname)
+        print(f"[SUCCESS] ZIP创建成功: {output_path}")
+        return output_path
+    except Exception as e:
+        print(f"[ERROR] 创建ZIP失败: {str(e)}")
+        raise
 
-        # 计算目标时间
-        target_time = get_next_target_time()
-        datetime_str = target_time.strftime("%Y%m%d%H")
-        datetime_str = '2025012118'
-        print(f"目标时间: {datetime_str}")
 
-        # 连接SFTP
-        # transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
-        # transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
-        # sftp = paramiko.SFTPClient.from_transport(transport)
+def clean_up_file(file_path: str) -> None:
+    """安全删除本地文件"""
+    try:
+        if os.path.exists(file_path):
+            os.remove(file_path)
+            print(f"[CLEANUP] 已删除本地文件: {file_path}")
+    except Exception as e:
+        print(f"[WARNING] 文件删除失败: {str(e)}")
+
+def prod_data_handler(mappings):
+    # 创建临时工作目录
+    for model_type in ['cdq', 'dq', 'zcq']:
+        with tempfile.TemporaryDirectory() as local_temp_dir:
+            final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
+            os.makedirs(final_collect_dir, exist_ok=True)
 
-        # 下载文件
-        download_files_via_sftp(MAPPINGS, datetime_str, local_temp_dir)
+            # 计算目标时间
+            target_time = get_next_target_time()
+            datetime_str = target_time.strftime("%Y%m%d%H")
+            print(f"目标时间: {datetime_str}")
+            datetime_str = '2025012412'
 
-        # 关闭SFTP连接
-        # sftp.close()
-        # transport.close()
+            # 下载文件
+            download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
 
-        # 处理下载的文件
-        process_zips(MAPPINGS, local_temp_dir, datetime_str, final_collect_dir)
+            # 处理下载的文件
+            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
 
-        # 创建最终ZIP
-        create_final_zip(final_collect_dir, datetime_str)
+            # 创建最终ZIP
+            zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
 
-        # 上传打包ZIP文件
-        upload_to_sftp()
+            # 上传打包ZIP文件
+            if upload_to_sftp(zip_path, f"/{model_type}"):
+                # 步骤3: 上传成功后清理
+                clean_up_file(zip_path)
+            else:
+                print("[WARNING] 上传未成功,保留本地文件")
 
 if __name__ == "__main__":
-    main()
+    prod_data_handler()