123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :pre_prod_ftp.py
- # @Time :2025/3/4 13:02
- # @Author :David
- # @Company: shenyang JY
- """
- 要实现的功能:
- 1. 获取场站-配置接口,根据:
- 类型(超短期 短期 中期)
- 算法工程师 / 模型 版本 / 场站编码
- 获取所有当前配置的场站模型
- 2. 根据场站模型配置和时刻,形成:
- 类型(超短期 短期 中期)
- 场站编码集合
- 其中,通过算法工程师 / 模型 版本从测试FTP上获取场站编码集合的步骤如下:
- (1)遍历算法工程师下的 模型 版本 zip文件,根据当前时刻,从FTP上下载, 解压
- (2)根据 模型 版本 下的 场站编码 依次获取,并组装成这个算法工程师的场站编码集合
- (3)合并所有算法工程师的场站编码集合,形成类型下的场站编码集合
- 3. 压缩成类型(超短期 短期 中期)三个zip文件,上传生产FTP
- """
- import requests
- import json
- import os
- import paramiko
- import zipfile, traceback
- from flask import Flask, request, jsonify
- from datetime import datetime, timedelta
- from typing import Optional
- from pytz import timezone
- import shutil, time
- import tempfile
- app = Flask('pre_prod_ftp——service')
- api_url = 'http://itil.jiayuepowertech.com:9958/itil/api/stationModelConfig'
- nick_name = {
- '0': 'seer',
- '1': 'koi',
- '2': 'lucky'
- }
- model_type_dict = {
- 'cdq': '0',
- 'dq': '1',
- 'zcq': '2'
- }
- # 配置信息
- SFTP_HOST = '192.168.1.33'
- SFTP_PORT = 2022
- SFTP_USERNAME = 'liudawei'
- SFTP_PASSWORD = 'liudawei@123'
- # 在原配置部分添加以下配置
- DEST_SFTP_HOST = '192.168.1.33'
- DEST_SFTP_PORT = 2025
- DEST_SFTP_USERNAME = 'liudawei'
- DEST_SFTP_PASSWORD = 'liudawei@123'
- from common.logs import Log
- logger = Log('pre_prod_ftp').logger
- def fetch_station_records(model_type, is_current=1):
- """
- 调用接口获取场站记录
- :paramModelType: 模型类型 0 超短期 1 短期 2 中期
- :paramIsCurrent: 模型启动状态(如 1 或 0)
- :return: 场站记录列表或错误信息
- """
- params = {
- "paramModelType": model_type_dict[str(model_type)],
- "paramIsCurrent": str(is_current) # 适配接口参数格式
- }
- try:
- response = requests.get(api_url, params=params, timeout=10)
- response.raise_for_status() # 检查HTTP错误
- return response.json() # 假设接口返回JSON
- except requests.exceptions.RequestException as e:
- return {"error": f"请求失败: {str(e)}"}
- except json.JSONDecodeError:
- return {"error": "接口返回非JSON数据"}
- def model_station_handler(api_data):
- """
- 处理接口数据,生成三级映射关系
- :param api_data: 接口返回的原始数据(假设为字典列表)
- :return: 可用于生成表格的结构化数据
- """
- # 创建映射字典
- mapping = {"lucky":{}, "seer":{}, "koi":{}}
- # 遍历每条场站记录
- for record in api_data:
- # 提取关键字段(根据实际接口字段名称修改)
- engineer = nick_name.get(record.get("engineerName"), "unknown")
- model_name = record.get("modelName")
- model_version = record.get("modelVersion")
- station_code = record.get("stationCode")
- assert engineer in mapping
- if all([engineer, model_name, model_version, station_code]):
- mapping[engineer].setdefault((model_name, model_version), set()).add(station_code)
- return mapping
- def get_next_target_time(current_time=None):
- """获取下一个目标时刻"""
- if current_time is None:
- current_time = datetime.now(timezone('Asia/Shanghai'))
- target_hours = [0, 6, 12, 18]
- current_hour = current_time.hour
- for hour in sorted(target_hours):
- if current_hour <= hour:
- return current_time.replace(hour=hour, minute=0, second=0, microsecond=0)
- # 如果当前时间超过所有目标小时,使用次日0点
- return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
- def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
- """
- 封装SFTP连接和文件下载的完整流程
- :param mappings: 文件映射配置
- :param datetime_str: 日期时间字符串,用于文件名
- :param local_temp_dir: 本地临时目录路径
- """
- download_success = False
- transport = None
- sftp = None
- try:
- # 创建SSH传输通道
- transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
- transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
- # 创建SFTP客户端
- sftp = paramiko.SFTPClient.from_transport(transport)
- # 执行文件下载
- for engineer in mappings:
- remote_base = f"/{engineer}/"
- try:
- sftp.chdir(remote_base)
- except FileNotFoundError:
- logger.info(f"工程师目录不存在: {remote_base}")
- continue
- for model_version in mappings[engineer]:
- target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_{model_type}.zip"
- remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
- local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
- try:
- sftp.get(remote_path, local_path)
- download_success = True
- logger.info(f"下载成功: {remote_path} -> {local_path}")
- except Exception as e:
- logger.info(f"文件下载失败 {remote_path}: {str(e)}")
- except paramiko.AuthenticationException:
- logger.info("认证失败,请检查用户名和密码")
- except paramiko.SSHException as e:
- logger.info(f"SSH连接异常: {str(e)}")
- except Exception as e:
- logger.info(f"未知错误: {str(e)}")
- finally:
- # 遍历到最后一个中短期,确保连接关闭
- if model_type == 'zcq':
- if sftp:
- sftp.close()
- if transport and transport.is_active():
- transport.close()
- return download_success
- def upload_to_sftp(local_path: str, target_dir: str) -> bool:
- """上传文件到SFTP服务器
- Args:
- local_path: 本地文件路径
- target_dir: 远程目标目录
- Returns:
- 上传是否成功 (True/False)
- """
- transport: Optional[paramiko.Transport] = None
- sftp: Optional[paramiko.SFTPClient] = None
- try:
- # 验证本地文件存在
- if not os.path.isfile(local_path):
- raise FileNotFoundError(f"本地文件不存在: {local_path}")
- # 创建SFTP连接
- transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
- transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
- sftp = paramiko.SFTPClient.from_transport(transport)
- # 执行上传
- remote_filename = os.path.basename(local_path)
- remote_path = f"{target_dir}/{remote_filename}"
- sftp.put(local_path, remote_path)
- logger.info(f"[SUCCESS] 上传完成: {remote_path}")
- return True
- except Exception as e:
- logger.info(f"[ERROR] 上传失败: {str(e)}")
- return False
- finally:
- # 确保资源释放
- if sftp:
- sftp.close()
- if transport and transport.is_active():
- transport.close()
- def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir, model_type):
- """处理所有下载的ZIP文件并收集场站目录"""
- for engineer in mappings:
- for model_version in mappings[engineer]:
- target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_{model_type}.zip"
- zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
- station_codes = mappings[engineer][model_version]
- if not os.path.exists(zip_path):
- continue
- # 创建临时解压目录
- with tempfile.TemporaryDirectory() as temp_extract:
- # 解压ZIP文件
- try:
- with zipfile.ZipFile(zip_path, 'r') as zf:
- zf.extractall(temp_extract)
- except zipfile.BadZipFile:
- logger.info(f"无效的ZIP文件: {zip_path}")
- continue
- # 收集场站目录
- for root, dirs, files in os.walk(temp_extract):
- for dir_name in dirs:
- if dir_name in station_codes:
- src = os.path.join(root, dir_name)
- dest = os.path.join(final_collect_dir, dir_name)
- if not os.path.exists(dest):
- shutil.copytree(src, dest)
- logger.info(f"已收集场站: {dir_name}")
- def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
- """创建ZIP压缩包并返回完整路径
- Args:
- final_collect_dir: 需要打包的源目录
- datetime_str: 时间戳字符串
- model_type: 模型类型标识
- Returns:
- 生成的ZIP文件完整路径
- """
- ftp_dir = os.path.dirname(os.path.dirname(__file__))
- output_dir = os.path.join(ftp_dir, 'cache', 'ftp')
- # 确保缓存目录存在
- os.makedirs(output_dir, exist_ok=True)
- # 构造标准化文件名
- zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
- output_path = os.path.join(output_dir, zip_filename)
- try:
- with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
- for root, _, files in os.walk(final_collect_dir):
- for file in files:
- file_path = os.path.join(root, file)
- arcname = os.path.relpath(file_path, final_collect_dir)
- zf.write(file_path, arcname)
- logger.info(f"[SUCCESS] ZIP创建成功: {output_path}")
- return output_path
- except Exception as e:
- logger.info(f"[ERROR] 创建ZIP失败: {str(e)}")
- raise
- def clean_up_file(file_path: str) -> None:
- """安全删除本地文件"""
- try:
- if os.path.exists(file_path):
- os.remove(file_path)
- logger.info(f"[CLEANUP] 已删除本地文件: {file_path}")
- except Exception as e:
- logger.info(f"[WARNING] 文件删除失败: {str(e)}")
- def prod_data_handler(mappings, model_type):
- # 创建临时工作目录
- with tempfile.TemporaryDirectory() as local_temp_dir:
- final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
- os.makedirs(final_collect_dir, exist_ok=True)
- # 计算目标时间
- target_time = get_next_target_time()
- datetime_str = target_time.strftime("%Y%m%d%H")
- logger.info(f"目标时间: {datetime_str}")
- # 下载文件
- if download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
- # 处理下载的文件
- process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir, model_type)
- # 创建最终ZIP
- zip_path = create_final_zip(final_collect_dir, datetime_str, model_type)
- # 上传打包ZIP文件
- if upload_to_sftp(zip_path, f"/{model_type}"):
- # 步骤3: 上传成功后清理
- clean_up_file(zip_path)
- else:
- logger.info("[WARNING] 上传未成功,保留本地文件")
- from apscheduler.schedulers.blocking import BlockingScheduler
- from apscheduler.triggers.cron import CronTrigger
- def target_job():
- """目标方法,实际任务逻辑在此实现"""
- for model_type in ['cdq', 'dq', 'zcq']:
- models = fetch_station_records(model_type)
- mapping = model_station_handler(models['data'])
- prod_data_handler(mapping, model_type)
- def configure_scheduler():
- # 创建调度器(可设置时区,如 timezone='Asia/Shanghai')
- scheduler = BlockingScheduler()
- # 配置第一个触发器:处理每个主小时段的前 60 分钟(0-50 分钟)
- trigger1 = CronTrigger(
- hour='3,5,11,17', # 主触发小时
- minute='0-50/10', # 每 10 分钟一次,覆盖 00:00-00:50
- timezone='Asia/Shanghai' # 按需设置时区
- )
- # 添加任务到调度器
- scheduler.add_job(target_job, trigger1)
- # 启动调度器
- try:
- logger.info("⏰ pre_prod_ftp:生产ftp定时任务已启动,按 Ctrl+C 退出")
- scheduler.start()
- except (KeyboardInterrupt, SystemExit):
- logger.info("⏹️ 定时任务已停止")
- @app.route('/pre_prod_ftp', methods=['POST'])
- def pre_prod_ftp():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- args = {}
- try:
- target_job()
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- logger.info("生产文件下发ftp出错:{}".format(my_exception))
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- return result
- if __name__ == "__main__":
- configure_scheduler()
- from waitress import serve
- serve(app, host="0.0.0.0", port=10118)
|