12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :pre_prod_ftp.py
- # @Time :2025/3/4 13:02
- # @Author :David
- # @Company: shenyang JY
- """
- 要实现的功能:
- 1. 获取场站-配置接口,根据:
- 类型(超短期 短期 中期)
- 算法工程师 / 模型 版本 / 场站编码
- 获取所有当前配置的场站模型
- 2. 根据场站模型配置和时刻,形成:
- 类型(超短期 短期 中期)
- 场站编码集合
- 其中,通过算法工程师 / 模型 版本从测试FTP上获取场站编码集合的步骤如下:
- (1)遍历算法工程师下的 模型 版本 zip文件,根据当前时刻,从FTP上下载, 解压
- (2)根据 模型 版本 下的 场站编码 依次获取,并组装成这个算法工程师的场站编码集合
- (3)合并所有算法工程是的场站编码集合,形成类型下的场站编码集合
- 3. 压缩成类型(超短期 短期 中期)三个zip文件,上传生产FTP
- """
- from collections import defaultdict
- import requests
- import json
- import os
- import paramiko
- import zipfile
- from io import BytesIO
- from datetime import datetime
- api_url = 'http://itil.jiayuepowertech.com:9958/itil/api/stationModelConfig'
- nick_name = {
- '0': 'seer',
- '1': 'koi',
- '2': 'lucky'
- }
- def fetch_station_records(model_type, is_current=1):
- """
- 调用接口获取场站记录
- :paramModelType: 模型类型 0 超短期 1 短期 2 中期
- :paramIsCurrent: 模型启动状态(如 1 或 0)
- :return: 场站记录列表或错误信息
- """
- params = {
- "paramModelType": str(model_type),
- "paramIsCurrent": str(is_current) # 适配接口参数格式
- }
- try:
- response = requests.get(api_url, params=params, timeout=10)
- response.raise_for_status() # 检查HTTP错误
- return response.json() # 假设接口返回JSON
- except requests.exceptions.RequestException as e:
- return {"error": f"请求失败: {str(e)}"}
- except json.JSONDecodeError:
- return {"error": "接口返回非JSON数据"}
- def process_station_data(api_data):
- """
- 处理接口数据,生成三级映射关系
- :param api_data: 接口返回的原始数据(假设为字典列表)
- :return: 可用于生成表格的结构化数据
- """
- # 创建映射字典
- mapping = {"lucky":{}, "seer":{}, "koi":{}}
- # 遍历每条场站记录
- for record in api_data:
- # 提取关键字段(根据实际接口字段名称修改)
- engineer = nick_name.get(record.get("engineerName"), "unknown")
- model_name = record.get("modelName")
- model_version = record.get("modelVersion")
- station_code = record.get("stationCode")
- assert engineer in mapping
- if all([engineer, model_name, model_version, station_code]):
- mapping[engineer].setdefault((model_name, model_version), set()).add(station_code)
- return mapping
- if __name__ == "__main__":
- models = fetch_station_records(1)
- mapping = process_station_data(models['data'])
- print(mapping)
|