David 2 weeks ago
parent
commit
d0a011ee0e

+ 32 - 0
data_processing/data_operation/ftp.yaml

@@ -0,0 +1,32 @@
+anweiguo:
+  modeler: seer
+  password: anweiguo@123
+  user: anweiguo
+huangzihan:
+  modeler: hzh
+  password: huangzihan@123
+  use: huangzihan
+liudawei:
+  modeler: koi
+  password: liudawei@123
+  user: liudawei
+wulianlong:
+  modeler: lucky
+  password: wulianlong@123
+  user: wulianlong
+host: 192.168.1.33
+local_dir: ./data_processing/cache/data
+port_dev: 2022
+port_prd: 2025
+zip_mode: w
+nickname:
+  '0': 'seer'
+  '1': 'koi'
+  '2': 'lucky'
+  '3': 'hzh'
+model_type_dict:
+  cdq: 0
+  dq: 1
+  zcq: 2
+host_dev: '192.168.1.33'
+host_prd: '192.168.1.33'

+ 4 - 24
data_processing/data_operation/pre_data_ftp.py

@@ -4,6 +4,7 @@
 # @Time      :2024/12/26 14:18
 # @Author    :David
 # @Company: shenyang JY
+import yaml
 from ftplib import FTP
 import pandas as pd
 from flask import Flask, request, jsonify
@@ -15,31 +16,10 @@ from common.logs import Log
 logger = Log('data-processing').logger
 app = Flask('pre_data_ftp——service')
 
-ftp_params = {
-    'local_dir' : './data_processing/cache/data',
-    # 'host' : '192.168.12.20',
-    # 'port': 32121,
-    'host': '192.168.1.33',
-    'port_dev': 2022,
-    'port_prd': 2025,
-    'zip_mode': 'w',
-    'liudawei' : {
-        'user' : 'liudawei',
-        'password' : 'liudawei@123',
-        'modeler' : 'koi'
-    },
-    'anweiguo':{
-        'user' : 'anweiguo',
-        'password' : 'anweiguo@123',
-        'modeler' : 'seer'
-    },
-    'wulianlong':{
-        'user': 'wulianlong',
-        'password': 'wulianlong@123',
-        'modeler': 'lucky'
-    }
-}
 
+current_dir = os.path.dirname(os.path.abspath(__file__))
+with open(os.path.join(current_dir, 'ftp.yaml'), 'r', encoding='utf-8') as f:
+    ftp_params = yaml.safe_load(f)  # 只读的全局配置
 def get_moment_next(schedule_dt=False):
     if schedule_dt:
         now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')

+ 15 - 20
data_processing/data_operation/pre_prod_ftp.py

@@ -22,7 +22,7 @@
 """
 import requests
 import json
-import os
+import os, yaml
 import paramiko
 import zipfile, traceback
 from flask import Flask, request, jsonify
@@ -34,26 +34,21 @@ import tempfile
 app = Flask('pre_prod_ftp——service')
 
 api_url = 'http://itil.jiayuepowertech.com:9958/itil/api/stationModelConfig'
-nick_name = {
-    '0': 'seer',
-    '1': 'koi',
-    '2': 'lucky'
-}
-model_type_dict = {
-    'cdq': '0',
-    'dq': '1',
-    'zcq': '2'
-}
+current_dir = os.path.dirname(os.path.abspath(__file__))
+with open(os.path.join(current_dir, 'ftp.yaml'), 'r', encoding='utf-8') as f:
+    ftp_config = yaml.safe_load(f)  # 只读的全局配置
+nick_name = ftp_config['nickname']
+model_type_dict = ftp_config['model_type_dict']
 # 配置信息
-SFTP_HOST = '192.168.1.33'
-SFTP_PORT = 2022
-SFTP_USERNAME = 'liudawei'
-SFTP_PASSWORD = 'liudawei@123'
+SFTP_HOST = ftp_config['host_dev']
+SFTP_PORT = ftp_config['port_dev']
+SFTP_USERNAME = ftp_config['liudawei']['user']
+SFTP_PASSWORD = ftp_config['liudawei']['password']
 # 在原配置部分添加以下配置
-DEST_SFTP_HOST = '192.168.1.33'
-DEST_SFTP_PORT = 2025
-DEST_SFTP_USERNAME = 'liudawei'
-DEST_SFTP_PASSWORD = 'liudawei@123'
+DEST_SFTP_HOST = ftp_config['host_prd']
+DEST_SFTP_PORT = ftp_config['port_prd']
+DEST_SFTP_USERNAME = ftp_config['liudawei']['user']
+DEST_SFTP_PASSWORD = ftp_config['liudawei']['password']
 
 from common.logs import Log
 logger = Log('pre_prod_ftp').logger
@@ -87,7 +82,7 @@ def model_station_handler(api_data):
     :return: 可用于生成表格的结构化数据
     """
     # 创建映射字典
-    mapping = {"lucky":{}, "seer":{}, "koi":{}}
+    mapping = {v:{} for k, v in nick_name.items()}
     # 遍历每条场站记录
     for record in api_data:
         # 提取关键字段(根据实际接口字段名称修改)

+ 0 - 248
data_processing/data_operation/test.py

@@ -1,248 +0,0 @@
-#!/usr/bin/env python
-# -*- coding:utf-8 -*-
-# @FileName  :test.py
-# @Time      :2025/3/13 14:19
-# @Author    :David
-# @Company: shenyang JY
-from enum import Enum
-import paramiko
-from datetime import datetime, timedelta
-from typing import Optional
-import os
-import zipfile
-import shutil
-import tempfile
-
-# 配置信息
-SFTP_HOST = '192.168.1.33'
-SFTP_PORT = 2022
-SFTP_USERNAME = 'liudawei'
-SFTP_PASSWORD = 'liudawei@123'
-# 在原配置部分添加以下配置
-DEST_SFTP_HOST = 'dest_sftp.example.com'
-DEST_SFTP_PORT = 22
-DEST_SFTP_USERNAME = 'dest_username'
-DEST_SFTP_PASSWORD = 'dest_password'
-DEFAULT_TARGET_DIR = 'cdq'  # 默认上传目录
-
-# 更新后的三级映射
-MAPPINGS = {
-    'koi': {('Zone', '1.0'): {'J00645'}},
-    'lucky': {}, 'seer': {('lgb', '1.0'): {'J00578'}}
-}
-
-def get_next_target_time(current_time=None):
-    """获取下一个目标时刻"""
-    if current_time is None:
-        current_time = datetime.now()
-
-    target_hours = [0, 6, 12, 18]
-    current_hour = current_time.hour
-
-    for hour in sorted(target_hours):
-        if current_hour < hour:
-            return current_time.replace(hour=hour, minute=0, second=0, microsecond=0)
-
-    # 如果当前时间超过所有目标小时,使用次日0点
-    return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
-
-
-def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
-    """
-    封装SFTP连接和文件下载的完整流程
-    :param mappings: 文件映射配置
-    :param datetime_str: 日期时间字符串,用于文件名
-    :param local_temp_dir: 本地临时目录路径
-    """
-    transport = None
-    sftp = None
-    try:
-        # 创建SSH传输通道
-        transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
-        transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
-
-        # 创建SFTP客户端
-        sftp = paramiko.SFTPClient.from_transport(transport)
-
-        # 执行文件下载
-        for engineer in mappings:
-            datetime_str = datetime_str if engineer == 'koi' else 2025012000
-            remote_base = f"/{engineer}/"
-            try:
-                sftp.chdir(remote_base)
-            except FileNotFoundError:
-                print(f"工程师目录不存在: {remote_base}")
-                continue
-
-            for model_version in mappings[engineer]:
-                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip"
-                remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
-                local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
-
-                try:
-                    sftp.get(remote_path, local_path)
-                    print(f"下载成功: {remote_path} -> {local_path}")
-                except Exception as e:
-                    print(f"文件下载失败 {remote_path}: {str(e)}")
-
-    except paramiko.AuthenticationException:
-        print("认证失败,请检查用户名和密码")
-    except paramiko.SSHException as e:
-        print(f"SSH连接异常: {str(e)}")
-    except Exception as e:
-        print(f"未知错误: {str(e)}")
-    finally:
-        # 遍历到最后一个中短期,确保连接关闭
-        if model_type == 'zcq':
-            if sftp:
-                sftp.close()
-            if transport and transport.is_active():
-                transport.close()
-
-
-def upload_to_sftp(local_path: str, target_dir: str) -> bool:
-    """上传文件到SFTP服务器
-
-    Args:
-        local_path: 本地文件路径
-        target_dir: 远程目标目录
-
-    Returns:
-        上传是否成功 (True/False)
-    """
-    transport: Optional[paramiko.Transport] = None
-    sftp: Optional[paramiko.SFTPClient] = None
-
-    try:
-        # 验证本地文件存在
-        if not os.path.isfile(local_path):
-            raise FileNotFoundError(f"本地文件不存在: {local_path}")
-
-        # 创建SFTP连接
-        transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
-        transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
-        sftp = paramiko.SFTPClient.from_transport(transport)
-
-        # 执行上传
-        remote_filename = os.path.basename(local_path)
-        remote_path = f"{target_dir}/{remote_filename}"
-        sftp.put(local_path, remote_path)
-        print(f"[SUCCESS] 上传完成: {remote_path}")
-        return True
-
-    except Exception as e:
-        print(f"[ERROR] 上传失败: {str(e)}")
-        return False
-    finally:
-        # 确保资源释放
-        if sftp:
-            sftp.close()
-        if transport and transport.is_active():
-            transport.close()
-
-def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
-    """处理所有下载的ZIP文件并收集场站目录"""
-    for engineer in mappings:
-        datetime_str = datetime_str if engineer == 'koi' else 2025012000
-        for model_version in mappings[engineer]:
-            target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
-            zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
-            station_codes = mappings[engineer][model_version]
-
-            if not os.path.exists(zip_path):
-                continue
-
-            # 创建临时解压目录
-            with tempfile.TemporaryDirectory() as temp_extract:
-                # 解压ZIP文件
-                try:
-                    with zipfile.ZipFile(zip_path, 'r') as zf:
-                        zf.extractall(temp_extract)
-                except zipfile.BadZipFile:
-                    print(f"无效的ZIP文件: {zip_path}")
-                    continue
-
-                # 收集场站目录
-                for root, dirs, files in os.walk(temp_extract):
-                    for dir_name in dirs:
-                        if dir_name in station_codes:
-                            src = os.path.join(root, dir_name)
-                            dest = os.path.join(final_collect_dir, dir_name)
-
-                            if not os.path.exists(dest):
-                                shutil.copytree(src, dest)
-                                print(f"已收集场站: {dir_name}")
-
-
-def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
-    """创建ZIP压缩包并返回完整路径
-
-    Args:
-        final_collect_dir: 需要打包的源目录
-        datetime_str: 时间戳字符串
-        model_type: 模型类型标识
-
-    Returns:
-        生成的ZIP文件完整路径
-    """
-    # 确保缓存目录存在
-    os.makedirs('../cache/ftp', exist_ok=True)
-
-    # 构造标准化文件名
-    zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
-    output_path = os.path.join('../cache/ftp', zip_filename)
-
-    try:
-        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
-            for root, _, files in os.walk(final_collect_dir):
-                for file in files:
-                    file_path = os.path.join(root, file)
-                    arcname = os.path.relpath(file_path, final_collect_dir)
-                    zf.write(file_path, arcname)
-        print(f"[SUCCESS] ZIP创建成功: {output_path}")
-        return output_path
-    except Exception as e:
-        print(f"[ERROR] 创建ZIP失败: {str(e)}")
-        raise
-
-
-def clean_up_file(file_path: str) -> None:
-    """安全删除本地文件"""
-    try:
-        if os.path.exists(file_path):
-            os.remove(file_path)
-            print(f"[CLEANUP] 已删除本地文件: {file_path}")
-    except Exception as e:
-        print(f"[WARNING] 文件删除失败: {str(e)}")
-
-def prod_data_handler(mappings):
-    # 创建临时工作目录
-    for model_type in ['cdq', 'dq', 'zcq']:
-        with tempfile.TemporaryDirectory() as local_temp_dir:
-            final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
-            os.makedirs(final_collect_dir, exist_ok=True)
-
-            # 计算目标时间
-            target_time = get_next_target_time()
-            datetime_str = target_time.strftime("%Y%m%d%H")
-            print(f"目标时间: {datetime_str}")
-            datetime_str = '2025012412'
-
-            # 下载文件
-            download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
-
-            # 处理下载的文件
-            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
-
-            # 创建最终ZIP
-            zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
-
-            # 上传打包ZIP文件
-            if upload_to_sftp(zip_path, f"/{model_type}"):
-                # 步骤3: 上传成功后清理
-                clean_up_file(zip_path)
-            else:
-                print("[WARNING] 上传未成功,保留本地文件")
-
-if __name__ == "__main__":
-    prod_data_handler()