#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :test.py # @Time :2025/3/13 14:19 # @Author :David # @Company: shenyang JY from enum import Enum import paramiko from datetime import datetime, timedelta from typing import Optional import os import zipfile import shutil import tempfile # 配置信息 SFTP_HOST = '192.168.1.33' SFTP_PORT = 2022 SFTP_USERNAME = 'liudawei' SFTP_PASSWORD = 'liudawei@123' # 在原配置部分添加以下配置 DEST_SFTP_HOST = 'dest_sftp.example.com' DEST_SFTP_PORT = 22 DEST_SFTP_USERNAME = 'dest_username' DEST_SFTP_PASSWORD = 'dest_password' DEFAULT_TARGET_DIR = 'cdq' # 默认上传目录 # 更新后的三级映射 MAPPINGS = { 'koi': {('Zone', '1.0'): {'J00645'}}, 'lucky': {}, 'seer': {('lgb', '1.0'): {'J00578'}} } def get_next_target_time(current_time=None): """获取下一个目标时刻""" if current_time is None: current_time = datetime.now() target_hours = [0, 6, 12, 18] current_hour = current_time.hour for hour in sorted(target_hours): if current_hour < hour: return current_time.replace(hour=hour, minute=0, second=0, microsecond=0) # 如果当前时间超过所有目标小时,使用次日0点 return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0) def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type): """ 封装SFTP连接和文件下载的完整流程 :param mappings: 文件映射配置 :param datetime_str: 日期时间字符串,用于文件名 :param local_temp_dir: 本地临时目录路径 """ transport = None sftp = None try: # 创建SSH传输通道 transport = paramiko.Transport((SFTP_HOST, SFTP_PORT)) transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD) # 创建SFTP客户端 sftp = paramiko.SFTPClient.from_transport(transport) # 执行文件下载 for engineer in mappings: datetime_str = datetime_str if engineer == 'koi' else 2025012000 remote_base = f"/{engineer}/" try: sftp.chdir(remote_base) except FileNotFoundError: print(f"工程师目录不存在: {remote_base}") continue for model_version in mappings[engineer]: target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip" remote_path = os.path.join(remote_base, target_file).replace("\\", "/") local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/") try: sftp.get(remote_path, local_path) print(f"下载成功: {remote_path} -> {local_path}") except Exception as e: print(f"文件下载失败 {remote_path}: {str(e)}") except paramiko.AuthenticationException: print("认证失败,请检查用户名和密码") except paramiko.SSHException as e: print(f"SSH连接异常: {str(e)}") except Exception as e: print(f"未知错误: {str(e)}") finally: # 遍历到最后一个中短期,确保连接关闭 if model_type == 'zcq': if sftp: sftp.close() if transport and transport.is_active(): transport.close() def upload_to_sftp(local_path: str, target_dir: str) -> bool: """上传文件到SFTP服务器 Args: local_path: 本地文件路径 target_dir: 远程目标目录 Returns: 上传是否成功 (True/False) """ transport: Optional[paramiko.Transport] = None sftp: Optional[paramiko.SFTPClient] = None try: # 验证本地文件存在 if not os.path.isfile(local_path): raise FileNotFoundError(f"本地文件不存在: {local_path}") # 创建SFTP连接 transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT)) transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD) sftp = paramiko.SFTPClient.from_transport(transport) # 执行上传 remote_filename = os.path.basename(local_path) remote_path = f"{target_dir}/{remote_filename}" sftp.put(local_path, remote_path) print(f"[SUCCESS] 上传完成: {remote_path}") return True except Exception as e: print(f"[ERROR] 上传失败: {str(e)}") return False finally: # 确保资源释放 if sftp: sftp.close() if transport and transport.is_active(): transport.close() def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir): """处理所有下载的ZIP文件并收集场站目录""" for engineer in mappings: datetime_str = datetime_str if engineer == 'koi' else 2025012000 for model_version in mappings[engineer]: target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip" zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/") station_codes = mappings[engineer][model_version] if not os.path.exists(zip_path): continue # 创建临时解压目录 with tempfile.TemporaryDirectory() as temp_extract: # 解压ZIP文件 try: with zipfile.ZipFile(zip_path, 'r') as zf: zf.extractall(temp_extract) except zipfile.BadZipFile: print(f"无效的ZIP文件: {zip_path}") continue # 收集场站目录 for root, dirs, files in os.walk(temp_extract): for dir_name in dirs: if dir_name in station_codes: src = os.path.join(root, dir_name) dest = os.path.join(final_collect_dir, dir_name) if not os.path.exists(dest): shutil.copytree(src, dest) print(f"已收集场站: {dir_name}") def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str: """创建ZIP压缩包并返回完整路径 Args: final_collect_dir: 需要打包的源目录 datetime_str: 时间戳字符串 model_type: 模型类型标识 Returns: 生成的ZIP文件完整路径 """ # 确保缓存目录存在 os.makedirs('../cache/ftp', exist_ok=True) # 构造标准化文件名 zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip" output_path = os.path.join('../cache/ftp', zip_filename) try: with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf: for root, _, files in os.walk(final_collect_dir): for file in files: file_path = os.path.join(root, file) arcname = os.path.relpath(file_path, final_collect_dir) zf.write(file_path, arcname) print(f"[SUCCESS] ZIP创建成功: {output_path}") return output_path except Exception as e: print(f"[ERROR] 创建ZIP失败: {str(e)}") raise def clean_up_file(file_path: str) -> None: """安全删除本地文件""" try: if os.path.exists(file_path): os.remove(file_path) print(f"[CLEANUP] 已删除本地文件: {file_path}") except Exception as e: print(f"[WARNING] 文件删除失败: {str(e)}") def prod_data_handler(mappings): # 创建临时工作目录 for model_type in ['cdq', 'dq', 'zcq']: with tempfile.TemporaryDirectory() as local_temp_dir: final_collect_dir = os.path.join(local_temp_dir, 'collected_stations') os.makedirs(final_collect_dir, exist_ok=True) # 计算目标时间 target_time = get_next_target_time() datetime_str = target_time.strftime("%Y%m%d%H") print(f"目标时间: {datetime_str}") datetime_str = '2025012412' # 下载文件 download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type) # 处理下载的文件 process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir) # 创建最终ZIP zip_path = create_final_zip(final_collect_dir, datetime_str, model_type) # 上传打包ZIP文件 if upload_to_sftp(zip_path, f"/{model_type}"): # 步骤3: 上传成功后清理 clean_up_file(zip_path) else: print("[WARNING] 上传未成功,保留本地文件") if __name__ == "__main__": prod_data_handler()