test.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :test.py
  4. # @Time :2025/3/13 14:19
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import paramiko
  8. from datetime import datetime, timedelta
  9. import os
  10. import zipfile
  11. import shutil
  12. import tempfile
  13. # 配置信息
  14. SFTP_HOST = '192.168.1.33'
  15. SFTP_PORT = 2022
  16. SFTP_USERNAME = 'liudawei'
  17. SFTP_PASSWORD = 'liudawei@123'
  18. # 在原配置部分添加以下配置
  19. DEST_SFTP_HOST = 'dest_sftp.example.com'
  20. DEST_SFTP_PORT = 22
  21. DEST_SFTP_USERNAME = 'dest_username'
  22. DEST_SFTP_PASSWORD = 'dest_password'
  23. DEFAULT_TARGET_DIR = 'cdq' # 默认上传目录
  24. # 更新后的三级映射
  25. MAPPINGS = {
  26. 'koi': {('Zone', '1.0'): {'J00645'}},
  27. 'lucky': {}, 'seer': {('lgb', '1.0'): {'J00001'}}
  28. }
  29. def get_next_target_time(current_time=None):
  30. """获取下一个目标时刻"""
  31. if current_time is None:
  32. current_time = datetime.now()
  33. target_hours = [0, 6, 12, 18]
  34. current_hour = current_time.hour
  35. for hour in sorted(target_hours):
  36. if current_hour < hour:
  37. return current_time.replace(hour=hour, minute=0, second=0, microsecond=0)
  38. # 如果当前时间超过所有目标小时,使用次日0点
  39. return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  40. def download_sftp_files(sftp, mappings, datetime_str, local_temp_dir):
  41. """下载所有需要的SFTP文件"""
  42. for engineer in mappings:
  43. remote_base = f"/{engineer}/" # SFTP根目录下的工程师目录
  44. try:
  45. sftp.chdir(remote_base)
  46. except FileNotFoundError:
  47. print(f"工程师目录不存在: {remote_base}")
  48. continue
  49. for model_version in mappings[engineer]:
  50. # 构造目标文件名(模型版本已经合并)
  51. target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
  52. remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
  53. local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
  54. try:
  55. sftp.get(remote_path, local_path)
  56. print(f"下载成功: {remote_path}")
  57. except Exception as e:
  58. print(f"文件下载失败 {remote_path}: {str(e)}")
  59. def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
  60. """
  61. 封装SFTP连接和文件下载的完整流程
  62. :param mappings: 文件映射配置
  63. :param datetime_str: 日期时间字符串,用于文件名
  64. :param local_temp_dir: 本地临时目录路径
  65. """
  66. transport = None
  67. sftp = None
  68. try:
  69. # 创建SSH传输通道
  70. transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
  71. transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
  72. # 创建SFTP客户端
  73. sftp = paramiko.SFTPClient.from_transport(transport)
  74. # 执行文件下载
  75. for engineer in mappings:
  76. remote_base = f"/{engineer}/"
  77. try:
  78. sftp.chdir(remote_base)
  79. except FileNotFoundError:
  80. print(f"工程师目录不存在: {remote_base}")
  81. continue
  82. for model_version in mappings[engineer]:
  83. target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
  84. remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
  85. local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
  86. try:
  87. sftp.get(remote_path, local_path)
  88. print(f"下载成功: {remote_path} -> {local_path}")
  89. except Exception as e:
  90. print(f"文件下载失败 {remote_path}: {str(e)}")
  91. except paramiko.AuthenticationException:
  92. print("认证失败,请检查用户名和密码")
  93. except paramiko.SSHException as e:
  94. print(f"SSH连接异常: {str(e)}")
  95. except Exception as e:
  96. print(f"未知错误: {str(e)}")
  97. finally:
  98. # 确保连接关闭
  99. if sftp:
  100. sftp.close()
  101. if transport and transport.is_active():
  102. transport.close()
  103. def upload_to_sftp(local_path, target_dir):
  104. """上传文件到目标SFTP服务器"""
  105. transport = None
  106. sftp = None
  107. try:
  108. # 创建新的传输连接
  109. transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
  110. transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
  111. sftp = paramiko.SFTPClient.from_transport(transport)
  112. # 确保目标目录存在
  113. try:
  114. sftp.chdir(target_dir)
  115. except FileNotFoundError:
  116. sftp.mkdir(target_dir)
  117. print(f"已创建远程目录: {target_dir}")
  118. # 构造远程路径
  119. filename = os.path.basename(local_path)
  120. remote_path = f"{target_dir}/{filename}"
  121. # 执行上传
  122. sftp.put(local_path, remote_path)
  123. print(f"成功上传到: {remote_path}")
  124. except Exception as e:
  125. print(f"上传失败: {str(e)}")
  126. raise
  127. finally:
  128. # 确保连接关闭
  129. if sftp:
  130. sftp.close()
  131. if transport and transport.is_active():
  132. transport.close()
  133. def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
  134. """处理所有下载的ZIP文件并收集场站目录"""
  135. for engineer in mappings:
  136. for model_version in mappings[engineer]:
  137. target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
  138. zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
  139. station_codes = mappings[engineer][model_version]
  140. if not os.path.exists(zip_path):
  141. continue
  142. # 创建临时解压目录
  143. with tempfile.TemporaryDirectory() as temp_extract:
  144. # 解压ZIP文件
  145. try:
  146. with zipfile.ZipFile(zip_path, 'r') as zf:
  147. zf.extractall(temp_extract)
  148. except zipfile.BadZipFile:
  149. print(f"无效的ZIP文件: {zip_path}")
  150. continue
  151. # 收集场站目录
  152. for root, dirs, files in os.walk(temp_extract):
  153. for dir_name in dirs:
  154. if dir_name in station_codes:
  155. src = os.path.join(root, dir_name)
  156. dest = os.path.join(final_collect_dir, dir_name)
  157. if not os.path.exists(dest):
  158. shutil.copytree(src, dest)
  159. print(f"已收集场站: {dir_name}")
  160. def create_final_zip(final_collect_dir, datetime_str, output_path="result.zip"):
  161. """创建最终打包的ZIP文件"""
  162. with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
  163. for root, dirs, files in os.walk(final_collect_dir):
  164. for file in files:
  165. file_path = os.path.join(root, file)
  166. arcname = os.path.relpath(file_path, final_collect_dir)
  167. zf.write(file_path, arcname)
  168. print(f"最终打包完成: {output_path}")
  169. def main():
  170. # 创建临时工作目录
  171. with tempfile.TemporaryDirectory() as local_temp_dir:
  172. final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
  173. os.makedirs(final_collect_dir, exist_ok=True)
  174. # 计算目标时间
  175. target_time = get_next_target_time()
  176. datetime_str = target_time.strftime("%Y%m%d%H")
  177. datetime_str = '2025012118'
  178. print(f"目标时间: {datetime_str}")
  179. # 连接SFTP
  180. # transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
  181. # transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
  182. # sftp = paramiko.SFTPClient.from_transport(transport)
  183. # 下载文件
  184. download_files_via_sftp(MAPPINGS, datetime_str, local_temp_dir)
  185. # 关闭SFTP连接
  186. # sftp.close()
  187. # transport.close()
  188. # 处理下载的文件
  189. process_zips(MAPPINGS, local_temp_dir, datetime_str, final_collect_dir)
  190. # 创建最终ZIP
  191. create_final_zip(final_collect_dir, datetime_str)
  192. # 上传打包ZIP文件
  193. upload_to_sftp()
  194. if __name__ == "__main__":
  195. main()