pre_prod_ftp.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :pre_prod_ftp.py
  4. # @Time :2025/3/4 13:02
  5. # @Author :David
  6. # @Company: shenyang JY
  7. """
  8. 要实现的功能:
  9. 1. 获取场站-配置接口,根据:
  10. 类型(超短期 短期 中期)
  11. 算法工程师 / 模型 版本 / 场站编码
  12. 获取所有当前配置的场站模型
  13. 2. 根据场站模型配置和时刻,形成:
  14. 类型(超短期 短期 中期)
  15. 场站编码集合
  16. 其中,通过算法工程师 / 模型 版本从测试FTP上获取场站编码集合的步骤如下:
  17. (1)遍历算法工程师下的 模型 版本 zip文件,根据当前时刻,从FTP上下载, 解压
  18. (2)根据 模型 版本 下的 场站编码 依次获取,并组装成这个算法工程师的场站编码集合
  19. (3)合并所有算法工程师的场站编码集合,形成类型下的场站编码集合
  20. 3. 压缩成类型(超短期 短期 中期)三个zip文件,上传生产FTP
  21. """
  22. import requests
  23. import json
  24. import os
  25. import paramiko
  26. import zipfile, traceback
  27. from flask import Flask, request, jsonify
  28. from datetime import datetime, timedelta
  29. from typing import Optional
  30. from pytz import timezone
  31. import shutil, time
  32. import tempfile
  33. app = Flask('pre_prod_ftp——service')
  34. api_url = 'http://itil.jiayuepowertech.com:9958/itil/api/stationModelConfig'
  35. nick_name = {
  36. '0': 'seer',
  37. '1': 'koi',
  38. '2': 'lucky'
  39. }
  40. model_type_dict = {
  41. 'cdq': '0',
  42. 'dq': '1',
  43. 'zcq': '2'
  44. }
  45. # 配置信息
  46. SFTP_HOST = '192.168.1.33'
  47. SFTP_PORT = 2022
  48. SFTP_USERNAME = 'liudawei'
  49. SFTP_PASSWORD = 'liudawei@123'
  50. # 在原配置部分添加以下配置
  51. DEST_SFTP_HOST = '192.168.1.33'
  52. DEST_SFTP_PORT = 2025
  53. DEST_SFTP_USERNAME = 'liudawei'
  54. DEST_SFTP_PASSWORD = 'liudawei@123'
  55. from common.logs import Log
  56. logger = Log('pre_prod_ftp').logger
  57. def fetch_station_records(model_type, is_current=1):
  58. """
  59. 调用接口获取场站记录
  60. :paramModelType: 模型类型 0 超短期 1 短期 2 中期
  61. :paramIsCurrent: 模型启动状态(如 1 或 0)
  62. :return: 场站记录列表或错误信息
  63. """
  64. params = {
  65. "paramModelType": model_type_dict[str(model_type)],
  66. "paramIsCurrent": str(is_current) # 适配接口参数格式
  67. }
  68. try:
  69. response = requests.get(api_url, params=params, timeout=10)
  70. response.raise_for_status() # 检查HTTP错误
  71. return response.json() # 假设接口返回JSON
  72. except requests.exceptions.RequestException as e:
  73. return {"error": f"请求失败: {str(e)}"}
  74. except json.JSONDecodeError:
  75. return {"error": "接口返回非JSON数据"}
  76. def model_station_handler(api_data):
  77. """
  78. 处理接口数据,生成三级映射关系
  79. :param api_data: 接口返回的原始数据(假设为字典列表)
  80. :return: 可用于生成表格的结构化数据
  81. """
  82. # 创建映射字典
  83. mapping = {"lucky":{}, "seer":{}, "koi":{}}
  84. # 遍历每条场站记录
  85. for record in api_data:
  86. # 提取关键字段(根据实际接口字段名称修改)
  87. engineer = nick_name.get(record.get("engineerName"), "unknown")
  88. model_name = record.get("modelName")
  89. model_version = record.get("modelVersion")
  90. station_code = record.get("stationCode")
  91. assert engineer in mapping
  92. if all([engineer, model_name, model_version, station_code]):
  93. mapping[engineer].setdefault((model_name, model_version), set()).add(station_code)
  94. return mapping
  95. def get_next_target_time(current_time=None):
  96. """获取下一个目标时刻"""
  97. if current_time is None:
  98. current_time = datetime.now(timezone('Asia/Shanghai'))
  99. target_hours = [0, 6, 12, 18]
  100. current_hour = current_time.hour
  101. for hour in sorted(target_hours):
  102. if current_hour <= hour:
  103. return current_time.replace(hour=hour, minute=0, second=0, microsecond=0)
  104. # 如果当前时间超过所有目标小时,使用次日0点
  105. return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
  106. def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
  107. """
  108. 封装SFTP连接和文件下载的完整流程
  109. :param mappings: 文件映射配置
  110. :param datetime_str: 日期时间字符串,用于文件名
  111. :param local_temp_dir: 本地临时目录路径
  112. """
  113. download_success = False
  114. transport = None
  115. sftp = None
  116. try:
  117. # 创建SSH传输通道
  118. transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
  119. transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
  120. # 创建SFTP客户端
  121. sftp = paramiko.SFTPClient.from_transport(transport)
  122. # 执行文件下载
  123. for engineer in mappings:
  124. remote_base = f"/{engineer}/"
  125. try:
  126. sftp.chdir(remote_base)
  127. except FileNotFoundError:
  128. logger.info(f"工程师目录不存在: {remote_base}")
  129. continue
  130. for model_version in mappings[engineer]:
  131. target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_{model_type}.zip"
  132. remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
  133. local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
  134. try:
  135. sftp.get(remote_path, local_path)
  136. download_success = True
  137. logger.info(f"下载成功: {remote_path} -> {local_path}")
  138. except Exception as e:
  139. logger.info(f"文件下载失败 {remote_path}: {str(e)}")
  140. except paramiko.AuthenticationException:
  141. logger.info("认证失败,请检查用户名和密码")
  142. except paramiko.SSHException as e:
  143. logger.info(f"SSH连接异常: {str(e)}")
  144. except Exception as e:
  145. logger.info(f"未知错误: {str(e)}")
  146. finally:
  147. # 遍历到最后一个中短期,确保连接关闭
  148. if model_type == 'zcq':
  149. if sftp:
  150. sftp.close()
  151. if transport and transport.is_active():
  152. transport.close()
  153. return download_success
  154. def upload_to_sftp(local_path: str, target_dir: str) -> bool:
  155. """上传文件到SFTP服务器
  156. Args:
  157. local_path: 本地文件路径
  158. target_dir: 远程目标目录
  159. Returns:
  160. 上传是否成功 (True/False)
  161. """
  162. transport: Optional[paramiko.Transport] = None
  163. sftp: Optional[paramiko.SFTPClient] = None
  164. try:
  165. # 验证本地文件存在
  166. if not os.path.isfile(local_path):
  167. raise FileNotFoundError(f"本地文件不存在: {local_path}")
  168. # 创建SFTP连接
  169. transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
  170. transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
  171. sftp = paramiko.SFTPClient.from_transport(transport)
  172. # 执行上传
  173. remote_filename = os.path.basename(local_path)
  174. remote_path = f"{target_dir}/{remote_filename}"
  175. sftp.put(local_path, remote_path)
  176. logger.info(f"[SUCCESS] 上传完成: {remote_path}")
  177. return True
  178. except Exception as e:
  179. logger.info(f"[ERROR] 上传失败: {str(e)}")
  180. return False
  181. finally:
  182. # 确保资源释放
  183. if sftp:
  184. sftp.close()
  185. if transport and transport.is_active():
  186. transport.close()
  187. def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir, model_type):
  188. """处理所有下载的ZIP文件并收集场站目录"""
  189. for engineer in mappings:
  190. for model_version in mappings[engineer]:
  191. target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_{model_type}.zip"
  192. zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
  193. station_codes = mappings[engineer][model_version]
  194. if not os.path.exists(zip_path):
  195. continue
  196. # 创建临时解压目录
  197. with tempfile.TemporaryDirectory() as temp_extract:
  198. # 解压ZIP文件
  199. try:
  200. with zipfile.ZipFile(zip_path, 'r') as zf:
  201. zf.extractall(temp_extract)
  202. except zipfile.BadZipFile:
  203. logger.info(f"无效的ZIP文件: {zip_path}")
  204. continue
  205. # 收集场站目录
  206. for root, dirs, files in os.walk(temp_extract):
  207. for dir_name in dirs:
  208. if dir_name in station_codes:
  209. src = os.path.join(root, dir_name)
  210. dest = os.path.join(final_collect_dir, dir_name)
  211. if not os.path.exists(dest):
  212. shutil.copytree(src, dest)
  213. logger.info(f"已收集场站: {dir_name}")
  214. def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
  215. """创建ZIP压缩包并返回完整路径
  216. Args:
  217. final_collect_dir: 需要打包的源目录
  218. datetime_str: 时间戳字符串
  219. model_type: 模型类型标识
  220. Returns:
  221. 生成的ZIP文件完整路径
  222. """
  223. ftp_dir = os.path.dirname(os.path.dirname(__file__))
  224. output_dir = os.path.join(ftp_dir, 'cache', 'ftp')
  225. # 确保缓存目录存在
  226. os.makedirs(output_dir, exist_ok=True)
  227. # 构造标准化文件名
  228. zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
  229. output_path = os.path.join(output_dir, zip_filename)
  230. try:
  231. with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
  232. for root, _, files in os.walk(final_collect_dir):
  233. for file in files:
  234. file_path = os.path.join(root, file)
  235. arcname = os.path.relpath(file_path, final_collect_dir)
  236. zf.write(file_path, arcname)
  237. logger.info(f"[SUCCESS] ZIP创建成功: {output_path}")
  238. return output_path
  239. except Exception as e:
  240. logger.info(f"[ERROR] 创建ZIP失败: {str(e)}")
  241. raise
  242. def clean_up_file(file_path: str) -> None:
  243. """安全删除本地文件"""
  244. try:
  245. if os.path.exists(file_path):
  246. os.remove(file_path)
  247. logger.info(f"[CLEANUP] 已删除本地文件: {file_path}")
  248. except Exception as e:
  249. logger.info(f"[WARNING] 文件删除失败: {str(e)}")
  250. def prod_data_handler(mappings, model_type):
  251. # 创建临时工作目录
  252. with tempfile.TemporaryDirectory() as local_temp_dir:
  253. final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
  254. os.makedirs(final_collect_dir, exist_ok=True)
  255. # 计算目标时间
  256. target_time = get_next_target_time()
  257. datetime_str = target_time.strftime("%Y%m%d%H")
  258. logger.info(f"目标时间: {datetime_str}")
  259. # 下载文件
  260. if download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
  261. # 处理下载的文件
  262. process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir, model_type)
  263. # 创建最终ZIP
  264. zip_path = create_final_zip(final_collect_dir, datetime_str, model_type)
  265. # 上传打包ZIP文件
  266. if upload_to_sftp(zip_path, f"/{model_type}"):
  267. # 步骤3: 上传成功后清理
  268. clean_up_file(zip_path)
  269. else:
  270. logger.info("[WARNING] 上传未成功,保留本地文件")
  271. from apscheduler.schedulers.blocking import BlockingScheduler
  272. from apscheduler.triggers.cron import CronTrigger
  273. def target_job():
  274. """目标方法,实际任务逻辑在此实现"""
  275. for model_type in ['cdq', 'dq', 'zcq']:
  276. models = fetch_station_records(model_type)
  277. mapping = model_station_handler(models['data'])
  278. prod_data_handler(mapping, model_type)
  279. def configure_scheduler():
  280. # 创建调度器(可设置时区,如 timezone='Asia/Shanghai')
  281. scheduler = BlockingScheduler()
  282. # 配置第一个触发器:处理每个主小时段的前 60 分钟(0-50 分钟)
  283. trigger1 = CronTrigger(
  284. hour='3,5,11,17', # 主触发小时
  285. minute='0-50/10', # 每 10 分钟一次,覆盖 00:00-00:50
  286. timezone='Asia/Shanghai' # 按需设置时区
  287. )
  288. # 添加任务到调度器
  289. scheduler.add_job(target_job, trigger1)
  290. # 启动调度器
  291. try:
  292. logger.info("⏰ pre_prod_ftp:生产ftp定时任务已启动,按 Ctrl+C 退出")
  293. scheduler.start()
  294. except (KeyboardInterrupt, SystemExit):
  295. logger.info("⏹️ 定时任务已停止")
  296. @app.route('/pre_prod_ftp', methods=['POST'])
  297. def pre_prod_ftp():
  298. # 获取程序开始时间
  299. start_time = time.time()
  300. result = {}
  301. success = 0
  302. args = {}
  303. try:
  304. target_job()
  305. success = 1
  306. except Exception as e:
  307. my_exception = traceback.format_exc()
  308. my_exception.replace("\n", "\t")
  309. result['msg'] = my_exception
  310. logger.info("生产文件下发ftp出错:{}".format(my_exception))
  311. end_time = time.time()
  312. result['success'] = success
  313. result['args'] = args
  314. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  315. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  316. return result
  317. if __name__ == "__main__":
  318. configure_scheduler()
  319. from waitress import serve
  320. serve(app, host="0.0.0.0", port=10118)