David 1 月之前
父节点
当前提交
27d1efbd5d
共有 1 个文件被更改,包括 17 次插入14 次删除
  1. 17 14
      data_processing/data_operation/pre_prod_ftp.py

+ 17 - 14
data_processing/data_operation/pre_prod_ftp.py

@@ -120,6 +120,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
     :param datetime_str: 日期时间字符串,用于文件名
     :param local_temp_dir: 本地临时目录路径
     """
+    download_success = False
     transport = None
     sftp = None
     try:
@@ -146,10 +147,10 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
 
                 try:
                     sftp.get(remote_path, local_path)
+                    download_success = True
                     logger.info(f"下载成功: {remote_path} -> {local_path}")
                 except Exception as e:
-                    print(f"文件下载失败 {remote_path}: {str(e)}")
-
+                    logger.info(f"文件下载失败 {remote_path}: {str(e)}")
     except paramiko.AuthenticationException:
         logger.info("认证失败,请检查用户名和密码")
     except paramiko.SSHException as e:
@@ -163,6 +164,7 @@ def download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
                 sftp.close()
             if transport and transport.is_active():
                 transport.close()
+        return download_success
 
 
 def upload_to_sftp(local_path: str, target_dir: str) -> bool:
@@ -294,20 +296,20 @@ def prod_data_handler(mappings, model_type):
         logger.info(f"目标时间: {datetime_str}")
 
         # 下载文件
-        download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
+        if download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type):
 
-        # 处理下载的文件
-        process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
+            # 处理下载的文件
+            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
 
-        # 创建最终ZIP
-        zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
+            # 创建最终ZIP
+            zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
 
-        # 上传打包ZIP文件
-        if upload_to_sftp(zip_path, f"/{model_type}"):
-            # 步骤3: 上传成功后清理
-            clean_up_file(zip_path)
-        else:
-            logger.info("[WARNING] 上传未成功,保留本地文件")
+            # 上传打包ZIP文件
+            if upload_to_sftp(zip_path, f"/{model_type}"):
+                # 步骤3: 上传成功后清理
+                clean_up_file(zip_path)
+            else:
+                logger.info("[WARNING] 上传未成功,保留本地文件")
 
 
 from apscheduler.schedulers.blocking import BlockingScheduler
@@ -342,4 +344,5 @@ def configure_scheduler():
         logger.info("⏹️ 定时任务已停止")
 
 if __name__ == "__main__":
-    configure_scheduler()
+    # configure_scheduler()
+    target_job()