David 3 mesi fa
parent
commit
3184cbd9da
1 ha cambiato i file con 60 aggiunte e 28 eliminazioni
  1. 60 28
      data_processing/data_operation/pre_prod_ftp.py

+ 60 - 28
data_processing/data_operation/pre_prod_ftp.py

@@ -36,7 +36,11 @@ nick_name = {
     '1': 'koi',
     '2': 'lucky'
 }
-
+model_type_dict = {
+    'cdq': '0',
+    'dq': '1',
+    'czq': '2'
+}
 # 配置信息
 SFTP_HOST = '192.168.1.33'
 SFTP_PORT = 2022
@@ -58,7 +62,7 @@ def fetch_station_records(model_type, is_current=1):
     :return: 场站记录列表或错误信息
     """
     params = {
-        "paramModelType": str(model_type),
+        "paramModelType": model_type_dict[str(model_type)],
         "paramIsCurrent": str(is_current)  # 适配接口参数格式
     }
 
@@ -278,37 +282,65 @@ def clean_up_file(file_path: str) -> None:
     except Exception as e:
         print(f"[WARNING] 文件删除失败: {str(e)}")
 
-def prod_data_handler(mappings):
+def prod_data_handler(mappings, model_type):
     # 创建临时工作目录
-    for model_type in ['cdq', 'dq', 'zcq']:
-        with tempfile.TemporaryDirectory() as local_temp_dir:
-            final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
-            os.makedirs(final_collect_dir, exist_ok=True)
+    with tempfile.TemporaryDirectory() as local_temp_dir:
+        final_collect_dir = os.path.join(local_temp_dir, 'collected_stations')
+        os.makedirs(final_collect_dir, exist_ok=True)
+
+        # 计算目标时间
+        target_time = get_next_target_time()
+        datetime_str = target_time.strftime("%Y%m%d%H")
+        print(f"目标时间: {datetime_str}")
+        datetime_str = '2025012412'
+
+        # 下载文件
+        download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
+
+        # 处理下载的文件
+        process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
 
-            # 计算目标时间
-            target_time = get_next_target_time()
-            datetime_str = target_time.strftime("%Y%m%d%H")
-            print(f"目标时间: {datetime_str}")
-            datetime_str = '2025012412'
+        # 创建最终ZIP
+        zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
 
-            # 下载文件
-            download_files_via_sftp(mappings, datetime_str, local_temp_dir, model_type)
+        # 上传打包ZIP文件
+        if upload_to_sftp(zip_path, f"/{model_type}"):
+            # 步骤3: 上传成功后清理
+            clean_up_file(zip_path)
+        else:
+            print("[WARNING] 上传未成功,保留本地文件")
 
-            # 处理下载的文件
-            process_zips(mappings, local_temp_dir, datetime_str, final_collect_dir)
 
-            # 创建最终ZIP
-            zip_path  = create_final_zip(final_collect_dir, datetime_str, model_type)
+from apscheduler.schedulers.blocking import BlockingScheduler
+from apscheduler.triggers.cron import CronTrigger
 
-            # 上传打包ZIP文件
-            if upload_to_sftp(zip_path, f"/{model_type}"):
-                # 步骤3: 上传成功后清理
-                clean_up_file(zip_path)
-            else:
-                print("[WARNING] 上传未成功,保留本地文件")
+def target_job():
+    """目标方法,实际任务逻辑在此实现"""
+    for model_type in ['cdq', 'dq', 'zcq']:
+        models = fetch_station_records(model_type)
+        mapping = model_station_handler(models['data'])
+        prod_data_handler(mapping, model_type)
+
+def configure_scheduler():
+    # 创建调度器(可设置时区,如 timezone='Asia/Shanghai')
+    scheduler = BlockingScheduler()
+
+    # 配置第一个触发器:处理每个主小时段的前 60 分钟(0-50 分钟)
+    trigger1 = CronTrigger(
+        hour='0,6,12,18',  # 主触发小时
+        minute='0-50/10',  # 每 10 分钟一次,覆盖 00:00-00:50
+        timezone='Asia/Shanghai'  # 按需设置时区
+    )
+
+    # 添加任务到调度器
+    scheduler.add_job(target_job, trigger1)
+
+    # 启动调度器
+    try:
+        print("⏰ 定时任务已启动,按 Ctrl+C 退出")
+        scheduler.start()
+    except (KeyboardInterrupt, SystemExit):
+        print("⏹️ 定时任务已停止")
 
 if __name__ == "__main__":
-    models = fetch_station_records(1)
-    mapping = model_station_handler(models['data'])
-    prod_data_handler(mapping)
-    print(mapping)
+    configure_scheduler()