David пре 1 месец
родитељ
комит
42306b6cac
2 измењених фајлова са 331 додато и 93 уклоњено
  1. 233 4
      data_processing/data_operation/pre_prod_ftp.py
  2. 98 89
      data_processing/data_operation/test.py

+ 233 - 4
data_processing/data_operation/pre_prod_ftp.py

@@ -26,14 +26,30 @@ import json
 import os
 import paramiko
 import zipfile
-from io import BytesIO
-from datetime import datetime
+from datetime import datetime, timedelta
+from typing import Optional
+import shutil
+import tempfile
 api_url = 'http://itil.jiayuepowertech.com:9958/itil/api/stationModelConfig'
 nick_name = {
     '0': 'seer',
     '1': 'koi',
     '2': 'lucky'
 }
+
+# 配置信息
+SFTP_HOST = '192.168.1.33'
+SFTP_PORT = 2022
+SFTP_USERNAME = 'liudawei'
+SFTP_PASSWORD = 'liudawei@123'
+# 在原配置部分添加以下配置
+DEST_SFTP_HOST = 'dest_sftp.example.com'
+DEST_SFTP_PORT = 22
+DEST_SFTP_USERNAME = 'dest_username'
+DEST_SFTP_PASSWORD = 'dest_password'
+DEFAULT_TARGET_DIR = 'cdq'  # 默认上传目录
+
+
 def fetch_station_records(model_type, is_current=1):
     """
     调用接口获取场站记录
@@ -56,7 +72,7 @@ def fetch_station_records(model_type, is_current=1):
         return {"error": "接口返回非JSON数据"}
 
 
-def process_station_data(api_data):
+def model_station_handler(api_data):
     """
     处理接口数据,生成三级映射关系
     :param api_data: 接口返回的原始数据(假设为字典列表)
@@ -77,9 +93,222 @@ def process_station_data(api_data):
     return mapping
 
 
+def get_next_target_time(current_time=None):
+    """获取下一个目标时刻"""
+    if current_time is None:
+        current_time = datetime.now()
+
+    target_hours = [0, 6, 12, 18]
+    current_hour = current_time.hour
+
+    for hour in sorted(target_hours):
+        if current_hour < hour:
+            return current_time.replace(hour=hour, minute=0, second=0, microsecond=0)
+
+    # 如果当前时间超过所有目标小时,使用次日0点
+    return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
+
+
+def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
+    """
+    封装SFTP连接和文件下载的完整流程
+    :param mappings: 文件映射配置
+    :param datetime_str: 日期时间字符串,用于文件名
+    :param local_temp_dir: 本地临时目录路径
+    """
+    transport = None
+    sftp = None
+    try:
+        # 创建SSH传输通道
+        transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
+        transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
+
+        # 创建SFTP客户端
+        sftp = paramiko.SFTPClient.from_transport(transport)
+
+        # 执行文件下载
+        for engineer in mappings:
+            datetime_str = datetime_str if engineer == 'koi' else 2025012000
+            remote_base = f"/{engineer}/"
+            try:
+                sftp.chdir(remote_base)
+            except FileNotFoundError:
+                print(f"工程师目录不存在: {remote_base}")
+                continue
+
+            for model_version in mappings[engineer]:
+                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip"
+                remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
+                local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
+
+                try:
+                    sftp.get(remote_path, local_path)
+                    print(f"下载成功: {remote_path} -> {local_path}")
+                except Exception as e:
+                    print(f"文件下载失败 {remote_path}: {str(e)}")
+
+    except paramiko.AuthenticationException:
+        print("认证失败,请检查用户名和密码")
+    except paramiko.SSHException as e:
+        print(f"SSH连接异常: {str(e)}")
+    except Exception as e:
+        print(f"未知错误: {str(e)}")
+    finally:
+        # 遍历到最后一个中短期,确保连接关闭
+        if model_type == 'zcq':
+            if sftp:
+                sftp.close()
+            if transport and transport.is_active():
+                transport.close()
+
+
+def upload_to_sftp(local_path: str, target_dir: str) -> bool:
+    """上传文件到SFTP服务器
+
+    Args:
+        local_path: 本地文件路径
+        target_dir: 远程目标目录
+
+    Returns:
+        上传是否成功 (True/False)
+    """
+    transport: Optional[paramiko.Transport] = None
+    sftp: Optional[paramiko.SFTPClient] = None
+
+    try:
+        # 验证本地文件存在
+        if not os.path.isfile(local_path):
+            raise FileNotFoundError(f"本地文件不存在: {local_path}")
+
+        # 创建SFTP连接
+        transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
+        transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
+        sftp = paramiko.SFTPClient.from_transport(transport)
+
+        # 执行上传
+        remote_filename = os.path.basename(local_path)
+        remote_path = f"{target_dir}/{remote_filename}"
+        sftp.put(local_path, remote_path)
+        print(f"[SUCCESS] 上传完成: {remote_path}")
+        return True
+
+    except Exception as e:
+        print(f"[ERROR] 上传失败: {str(e)}")
+        return False
+    finally:
+        # 确保资源释放
+        if sftp:
+            sftp.close()
+        if transport and transport.is_active():
+            transport.close()
+
+
+def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
+    """处理所有下载的ZIP文件并收集场站目录"""
+    for engineer in mappings:
+        datetime_str = datetime_str if engineer == 'koi' else 2025012000
+        for model_version in mappings[engineer]:
+            target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
+            zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
+            station_codes = mappings[engineer][model_version]
+
+            if not os.path.exists(zip_path):
+                continue
+
+            # 创建临时解压目录
+            with tempfile.TemporaryDirectory() as temp_extract:
+                # 解压ZIP文件
+                try:
+                    with zipfile.ZipFile(zip_path, 'r') as zf:
+                        zf.extractall(temp_extract)
+                except zipfile.BadZipFile:
+                    print(f"无效的ZIP文件: {zip_path}")
+                    continue
+
+                # 收集场站目录
+                for root, dirs, files in os.walk(temp_extract):
+                    for dir_name in dirs:
+                        if dir_name in station_codes:
+                            src = os.path.join(root, dir_name)
+                            dest = os.path.join(final_collect_dir, dir_name)
+
+                            if not os.path.exists(dest):
+                                shutil.copytree(src, dest)
+                                print(f"已收集场站: {dir_name}")
+
+
+def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
+    """创建ZIP压缩包并返回完整路径
+
+    Args:
+        final_collect_dir: 需要打包的源目录
+        datetime_str: 时间戳字符串
+        model_type: 模型类型标识
+
+    Returns:
+        生成的ZIP文件完整路径
+    """
+    # 确保缓存目录存在
+    os.makedirs('../cache/ftp', exist_ok=True)
+
+    # 构造标准化文件名
+    zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
+    output_path = os.path.join('../cache/ftp', zip_filename)
+
+    try:
+        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
+            for root, _, files in os.walk(final_collect_dir):
+                for file in files:
+                    file_path = os.path.join(root, file)
+                    arcname = os.path.relpath(file_path, final_collect_dir)
+                    zf.write(file_path, arcname)
+        print(f"[SUCCESS] ZIP创建成功: {output_path}")
+        return output_path
+    except Exception as e:
+        print(f"[ERROR] 创建ZIP失败: {str(e)}")
+        raise
+
+
+def clean_up_file(file_path: str) -> None:
+    """安全删除本地文件"""
+    try:
+        if os.path.exists(file_path):
+            os.remove(file_path)
+            print(f"[CLEANUP] 已删除本地文件: {file_path}")
+    except Exception as e:
+        print(f"[WARNING] 文件删除失败: {str(e)}")
+
+def prod_data_handler(mappings):
+    # 创建临时工作目录
+    for model_type in ['cdq', 'dq', 'zcq']:
+        with tempfile.TemporaryDirectory() as local_temp_dir:
+            final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
+            os.makedirs(final_collect_dir, exist_ok=True)
+
+            # 计算目标时间
+            target_time = get_next_target_time()
+            datetime_str = target_time.strftime("%Y%m%d%H")
+            print(f"目标时间: {datetime_str}")
+            datetime_str = '2025012412'
+
+            # 下载文件
+            download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
+
+            # 处理下载的文件
+            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
+
+            # 创建最终ZIP
+            zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
 
+            # 上传打包ZIP文件
+            if upload_to_sftp(zip_path, f"/{model_type}"):
+                # 步骤3: 上传成功后清理
+                clean_up_file(zip_path)
+            else:
+                print("[WARNING] 上传未成功,保留本地文件")
 
 if __name__ == "__main__":
     models = fetch_station_records(1)
-    mapping = process_station_data(models['data'])
+    mapping = model_station_handler(models['data'])
+    prod_data_handler(mapping)
     print(mapping)

+ 98 - 89
data_processing/data_operation/test.py

@@ -4,9 +4,10 @@
 # @Time      :2025/3/13 14:19
 # @Author    :David
 # @Company: shenyang JY
-
+from enum import Enum
 import paramiko
 from datetime import datetime, timedelta
+from typing import Optional
 import os
 import zipfile
 import shutil
@@ -27,10 +28,9 @@ DEFAULT_TARGET_DIR = 'cdq'  # 默认上传目录
 # 更新后的三级映射
 MAPPINGS = {
     'koi': {('Zone', '1.0'): {'J00645'}},
-    'lucky': {}, 'seer': {('lgb', '1.0'): {'J00001'}}
+    'lucky': {}, 'seer': {('lgb', '1.0'): {'J00578'}}
 }
 
-
 def get_next_target_time(current_time=None):
     """获取下一个目标时刻"""
     if current_time is None:
@@ -47,31 +47,7 @@ def get_next_target_time(current_time=None):
     return (current_time + timedelta(days=1)).replace(hour=0, minute=0, second=0, microsecond=0)
 
 
-def download_sftp_files(sftp, mappings, datetime_str, local_temp_dir):
-    """下载所有需要的SFTP文件"""
-    for engineer in mappings:
-        remote_base = f"/{engineer}/"  # SFTP根目录下的工程师目录
-        try:
-            sftp.chdir(remote_base)
-        except FileNotFoundError:
-            print(f"工程师目录不存在: {remote_base}")
-            continue
-
-        for model_version in mappings[engineer]:
-            # 构造目标文件名(模型版本已经合并)
-
-            target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
-            remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
-            local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
-
-            try:
-                sftp.get(remote_path, local_path)
-                print(f"下载成功: {remote_path}")
-            except Exception as e:
-                print(f"文件下载失败 {remote_path}: {str(e)}")
-
-
-def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
+def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
     """
     封装SFTP连接和文件下载的完整流程
     :param mappings: 文件映射配置
@@ -90,6 +66,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
 
         # 执行文件下载
         for engineer in mappings:
+            datetime_str = datetime_str if engineer == 'koi' else 2025012000
             remote_base = f"/{engineer}/"
             try:
                 sftp.chdir(remote_base)
@@ -98,7 +75,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
                 continue
 
             for model_version in mappings[engineer]:
-                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
+                target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}.zip"
                 remote_path = os.path.join(remote_base, target_file).replace("\\", "/")
                 local_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
 
@@ -115,42 +92,49 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir):
     except Exception as e:
         print(f"未知错误: {str(e)}")
     finally:
-        # 确保连接关闭
-        if sftp:
-            sftp.close()
-        if transport and transport.is_active():
-            transport.close()
+        # 遍历到最后一个中短期,确保连接关闭
+        if model_type == 'zcq':
+            if sftp:
+                sftp.close()
+            if transport and transport.is_active():
+                transport.close()
+
+
+def upload_to_sftp(local_path: str, target_dir: str) -> bool:
+    """上传文件到SFTP服务器
+
+    Args:
+        local_path: 本地文件路径
+        target_dir: 远程目标目录
+
+    Returns:
+        上传是否成功 (True/False)
+    """
+    transport: Optional[paramiko.Transport] = None
+    sftp: Optional[paramiko.SFTPClient] = None
 
-def upload_to_sftp(local_path, target_dir):
-    """上传文件到目标SFTP服务器"""
-    transport = None
-    sftp = None
     try:
-        # 创建新的传输连接
+        # 验证本地文件存在
+        if not os.path.isfile(local_path):
+            raise FileNotFoundError(f"本地文件不存在: {local_path}")
+
+        # 创建SFTP连接
         transport = paramiko.Transport((DEST_SFTP_HOST, DEST_SFTP_PORT))
         transport.connect(username=DEST_SFTP_USERNAME, password=DEST_SFTP_PASSWORD)
         sftp = paramiko.SFTPClient.from_transport(transport)
 
-        # 确保目标目录存在
-        try:
-            sftp.chdir(target_dir)
-        except FileNotFoundError:
-            sftp.mkdir(target_dir)
-            print(f"已创建远程目录: {target_dir}")
-
-        # 构造远程路径
-        filename = os.path.basename(local_path)
-        remote_path = f"{target_dir}/{filename}"
-
         # 执行上传
+        remote_filename = os.path.basename(local_path)
+        remote_path = f"{target_dir}/{remote_filename}"
         sftp.put(local_path, remote_path)
-        print(f"成功上传到: {remote_path}")
+        print(f"[SUCCESS] 上传完成: {remote_path}")
+        return True
 
     except Exception as e:
-        print(f"上传失败: {str(e)}")
-        raise
+        print(f"[ERROR] 上传失败: {str(e)}")
+        return False
     finally:
-        # 确保连接关闭
+        # 确保资源释放
         if sftp:
             sftp.close()
         if transport and transport.is_active():
@@ -159,6 +143,7 @@ def upload_to_sftp(local_path, target_dir):
 def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
     """处理所有下载的ZIP文件并收集场站目录"""
     for engineer in mappings:
+        datetime_str = datetime_str if engineer == 'koi' else 2025012000
         for model_version in mappings[engineer]:
             target_file = f"jy_{engineer}.{'.'.join(model_version)}_{datetime_str}_dq.zip"
             zip_path = os.path.join(local_temp_dir, target_file).replace("\\", "/")
@@ -189,51 +174,75 @@ def process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir):
                                 print(f"已收集场站: {dir_name}")
 
 
-def create_final_zip(final_collect_dir, datetime_str, output_path="result.zip"):
-    """创建最终打包的ZIP文件"""
-    with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
-        for root, dirs, files in os.walk(final_collect_dir):
-            for file in files:
-                file_path = os.path.join(root, file)
-                arcname = os.path.relpath(file_path, final_collect_dir)
-                zf.write(file_path, arcname)
-    print(f"最终打包完成: {output_path}")
+def create_final_zip(final_collect_dir: str, datetime_str: str, model_type: str) -> str:
+    """创建ZIP压缩包并返回完整路径
 
+    Args:
+        final_collect_dir: 需要打包的源目录
+        datetime_str: 时间戳字符串
+        model_type: 模型类型标识
 
+    Returns:
+        生成的ZIP文件完整路径
+    """
+    # 确保缓存目录存在
+    os.makedirs('../cache/ftp', exist_ok=True)
 
+    # 构造标准化文件名
+    zip_filename = f"jy_algo_{datetime_str}_{model_type}.zip"
+    output_path = os.path.join('../cache/ftp', zip_filename)
 
-def main():
-    # 创建临时工作目录
-    with tempfile.TemporaryDirectory() as local_temp_dir:
-        final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
-        os.makedirs(final_collect_dir, exist_ok=True)
+    try:
+        with zipfile.ZipFile(output_path, 'w', zipfile.ZIP_DEFLATED) as zf:
+            for root, _, files in os.walk(final_collect_dir):
+                for file in files:
+                    file_path = os.path.join(root, file)
+                    arcname = os.path.relpath(file_path, final_collect_dir)
+                    zf.write(file_path, arcname)
+        print(f"[SUCCESS] ZIP创建成功: {output_path}")
+        return output_path
+    except Exception as e:
+        print(f"[ERROR] 创建ZIP失败: {str(e)}")
+        raise
 
-        # 计算目标时间
-        target_time = get_next_target_time()
-        datetime_str = target_time.strftime("%Y%m%d%H")
-        datetime_str = '2025012118'
-        print(f"目标时间: {datetime_str}")
 
-        # 连接SFTP
-        # transport = paramiko.Transport((SFTP_HOST, SFTP_PORT))
-        # transport.connect(username=SFTP_USERNAME, password=SFTP_PASSWORD)
-        # sftp = paramiko.SFTPClient.from_transport(transport)
+def clean_up_file(file_path: str) -> None:
+    """安全删除本地文件"""
+    try:
+        if os.path.exists(file_path):
+            os.remove(file_path)
+            print(f"[CLEANUP] 已删除本地文件: {file_path}")
+    except Exception as e:
+        print(f"[WARNING] 文件删除失败: {str(e)}")
+
+def prod_data_handler(mappings):
+    # 创建临时工作目录
+    for model_type in ['cdq', 'dq', 'zcq']:
+        with tempfile.TemporaryDirectory() as local_temp_dir:
+            final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
+            os.makedirs(final_collect_dir, exist_ok=True)
 
-        # 下载文件
-        download_files_via_sftp(MAPPINGS, datetime_str, local_temp_dir)
+            # 计算目标时间
+            target_time = get_next_target_time()
+            datetime_str = target_time.strftime("%Y%m%d%H")
+            print(f"目标时间: {datetime_str}")
+            datetime_str = '2025012412'
 
-        # 关闭SFTP连接
-        # sftp.close()
-        # transport.close()
+            # 下载文件
+            download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
 
-        # 处理下载的文件
-        process_zips(MAPPINGS, local_temp_dir, datetime_str, final_collect_dir)
+            # 处理下载的文件
+            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
 
-        # 创建最终ZIP
-        create_final_zip(final_collect_dir, datetime_str)
+            # 创建最终ZIP
+            zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
 
-        # 上传打包ZIP文件
-        upload_to_sftp()
+            # 上传打包ZIP文件
+            if upload_to_sftp(zip_path, f"/{model_type}"):
+                # 步骤3: 上传成功后清理
+                clean_up_file(zip_path)
+            else:
+                print("[WARNING] 上传未成功,保留本地文件")
 
 if __name__ == "__main__":
-    main()
+    prod_data_handler()