123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :test.py
- # @Time :2025/3/13 14:19
- # @Author :David
- # @Company: shenyang JY
- from enum import Enum
- import paramiko
- from datetime import datetime, timedelta
- from typing import Optional
- import os
- import zipfile
- import shutil
- import tempfile
- # 配置信息
- SFTP_HOST = '192.168.1.33'
- SFTP_PORT = 2022
- SFTP_USERNAME = 'liudawei'
- SFTP_PASSWORD = 'liudawei@123'
- # 在原配置部分添加以下配置
- DEST_SFTP_HOST = 'dest_sftp.example.com'
- DEST_SFTP_PORT = 22
- DEST_SFTP_USERNAME = 'dest_username'
- DEST_SFTP_PASSWORD = 'dest_password'
- DEFAULT_TARGET_DIR = 'cdq' # 默认上传目录
- # 更新后的三级映射
- MAPPINGS = {
- 'koi': {('Zone', '1.0'): {'J00645'}},
- 'lucky': {}, 'seer': {('lgb', '1.0'): {'J00578'}}
- }
- def get_next_target_time(current_time=None):
- """获取下一个目标时刻"""
- if current_time is None:
- current_time = datetime.now()
- target_hours = [0, 6, 12, 18]
- current_hour = current_time.hour
- for hour in sorted(target_hours):
- if current_hour < hour:
- return current_time.replace(hour=hour, minute=0, second=0, microsecond=0)
- # 如果当前时间超过所有目标小时,使用次日0点
- return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
- def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
- """
- 封装SFTP连接和文件下载的完整流程
- :param mappings: 文件映射配置
- :param datetime_str: 日期时间字符串,用于文件名
- :param local_temp_dir: 本地临时目录路径
- """
- transport = None
- sftp = None
- try:
- # 创建SSH传输通道
- transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
- transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
- # 创建SFTP客户端
- sftp = paramiko.SFTPClient.from_transport(transport)
- # 执行文件下载
- for engineer in mappings:
- datetime_str = datetime_str if engineer == 'koi' else 2025012000
- remote_base = f"/{engineer}/"
- try:
- sftp.chdir(remote_base)
- except FileNotFoundError:
- print(f"工程师目录不存在: {remote_base}")
- continue
- for model_version in mappings[engineer]:
- target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip"
- remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
- local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
- try:
- sftp.get(remote_path, local_path)
- print(f"下载成功: {remote_path} -> {local_path}")
- except Exception as e:
- print(f"文件下载失败 {remote_path}: {str(e)}")
- except paramiko.AuthenticationException:
- print("认证失败,请检查用户名和密码")
- except paramiko.SSHException as e:
- print(f"SSH连接异常: {str(e)}")
- except Exception as e:
- print(f"未知错误: {str(e)}")
- finally:
- # 遍历到最后一个中短期,确保连接关闭
- if model_type == 'zcq':
- if sftp:
- sftp.close()
- if transport and transport.is_active():
- transport.close()
- def upload_to_sftp(local_path: str, target_dir: str) -> bool:
- """上传文件到SFTP服务器
- Args:
- local_path: 本地文件路径
- target_dir: 远程目标目录
- Returns:
- 上传是否成功 (True/False)
- """
- transport: Optional[paramiko.Transport] = None
- sftp: Optional[paramiko.SFTPClient] = None
- try:
- # 验证本地文件存在
- if not os.path.isfile(local_path):
- raise FileNotFoundError(f"本地文件不存在: {local_path}")
- # 创建SFTP连接
- transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
- transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
- sftp = paramiko.SFTPClient.from_transport(transport)
- # 执行上传
- remote_filename = os.path.basename(local_path)
- remote_path = f"{target_dir}/{remote_filename}"
- sftp.put(local_path, remote_path)
- print(f"[SUCCESS] 上传完成: {remote_path}")
- return True
- except Exception as e:
- print(f"[ERROR] 上传失败: {str(e)}")
- return False
- finally:
- # 确保资源释放
- if sftp:
- sftp.close()
- if transport and transport.is_active():
- transport.close()
- def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
- """处理所有下载的ZIP文件并收集场站目录"""
- for engineer in mappings:
- datetime_str = datetime_str if engineer == 'koi' else 2025012000
- for model_version in mappings[engineer]:
- target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
- zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
- station_codes = mappings[engineer][model_version]
- if not os.path.exists(zip_path):
- continue
- # 创建临时解压目录
- with tempfile.TemporaryDirectory() as temp_extract:
- # 解压ZIP文件
- try:
- with zipfile.ZipFile(zip_path, 'r') as zf:
- zf.extractall(temp_extract)
- except zipfile.BadZipFile:
- print(f"无效的ZIP文件: {zip_path}")
- continue
- # 收集场站目录
- for root, dirs, files in os.walk(temp_extract):
- for dir_name in dirs:
- if dir_name in station_codes:
- src = os.path.join(root, dir_name)
- dest = os.path.join(final_collect_dir, dir_name)
- if not os.path.exists(dest):
- shutil.copytree(src, dest)
- print(f"已收集场站: {dir_name}")
- def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
- """创建ZIP压缩包并返回完整路径
- Args:
- final_collect_dir: 需要打包的源目录
- datetime_str: 时间戳字符串
- model_type: 模型类型标识
- Returns:
- 生成的ZIP文件完整路径
- """
- # 确保缓存目录存在
- os.makedirs('../cache/ftp', exist_ok=True)
- # 构造标准化文件名
- zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
- output_path = os.path.join('../cache/ftp', zip_filename)
- try:
- with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
- for root, _, files in os.walk(final_collect_dir):
- for file in files:
- file_path = os.path.join(root, file)
- arcname = os.path.relpath(file_path, final_collect_dir)
- zf.write(file_path, arcname)
- print(f"[SUCCESS] ZIP创建成功: {output_path}")
- return output_path
- except Exception as e:
- print(f"[ERROR] 创建ZIP失败: {str(e)}")
- raise
- def clean_up_file(file_path: str) -> None:
- """安全删除本地文件"""
- try:
- if os.path.exists(file_path):
- os.remove(file_path)
- print(f"[CLEANUP] 已删除本地文件: {file_path}")
- except Exception as e:
- print(f"[WARNING] 文件删除失败: {str(e)}")
- def prod_data_handler(mappings):
- # 创建临时工作目录
- for model_type in ['cdq', 'dq', 'zcq']:
- with tempfile.TemporaryDirectory() as local_temp_dir:
- final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
- os.makedirs(final_collect_dir, exist_ok=True)
- # 计算目标时间
- target_time = get_next_target_time()
- datetime_str = target_time.strftime("%Y%m%d%H")
- print(f"目标时间: {datetime_str}")
- datetime_str = '2025012412'
- # 下载文件
- download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
- # 处理下载的文件
- process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
- # 创建最终ZIP
- zip_path = create_final_zip(final_collect_dir, datetime_str, model_type)
- # 上传打包ZIP文件
- if upload_to_sftp(zip_path, f"/{model_type}"):
- # 步骤3: 上传成功后清理
- clean_up_file(zip_path)
- else:
- print("[WARNING] 上传未成功,保留本地文件")
- if __name__ == "__main__":
- prod_data_handler()
|