David 4 dagen geleden
bovenliggende
commit
131aa65ef8

+ 4 - 4
app/common/config.yml

@@ -1,5 +1,5 @@
-docs_mapping:
-  basi: DQYC_IN_BASIC
+doc_mapping:
+  basic: DQYC_IN_BASIC
   station_info_w: DQYC_IN_PLANT_WIND
   station_info_d_w: DQYC_IN_PLANT_DETAIL_WIND
   station_info_s: DQYC_IN_PLANT_SOLAR
@@ -11,8 +11,8 @@ docs_mapping:
   power: DQYC_IN_HISTORY_POWER_LONG
   nwp_v: DQYC_IN_FORECAST_WEATHER # 多版本气象
   nwp_v_h: DQYC_IN_FORECAST_WEATHER_H
-  nwp_own: DQYC_IN_FORECAST_WEATHER_OWNER
-  nwp_own_h: DQYC_IN_FORECAST_WEATHER_OWNER_H  # 自有气象
+  nwp_own: DQYC_IN_FORECAST_WEATHER
+  nwp_own_h: DQYC_IN_FORECAST_WEATHER_H  # 自有气象
   env_wf: DQYC_IN_ACTUAL_WEATHER_WIND
   env_sf: DQYC_IN_ACTUAL_WEATHER_SOLAR'  # 实测气象
 

File diff suppressed because it is too large
+ 55 - 0
app/logs/2025-05-20/south-forecast.2025-05-20.0.log


+ 4 - 4
app/model/main.py

@@ -48,7 +48,7 @@ def main():
 
     # 生成任务列表
     all_stations = [str(child) for child in Path(opt.input_file).iterdir() if child.is_dir()]
-    task_func = partial(station_task, config=config)
+    # task_func = partial(station_task, config=config)
 
     # ---------------------------- 监控任务 ----------------------------
     # 进度跟踪
@@ -63,15 +63,15 @@ def main():
                 task_config['gpu_assignment'] = gpu_id
 
                 # 提交任务
-                future = executor.submit(task_func, sid, task_config)
+                future = executor.submit(station_task, sid, task_config)
                 future.add_done_callback(
                     lambda _: rc.release_gpu(task_config['gpu_assignment']))
                 futures.append(future)
 
             # 处理完成情况
             for future in futures:
-                result = future.result()
-                if result['status'] == 'success':
+                result = future._result
+                if result == 'success':
                     completed += 1
                 pbar.update(1)
                 pbar.set_postfix_str(f"Completed: {completed}/{len(all_stations)}")

+ 33 - 29
app/model/material.py

@@ -4,8 +4,10 @@
 # @Time      :2025/4/29 11:07
 # @Author    :David
 # @Company: shenyang JY
+import types
 import pandas as pd
 from pathlib import Path
+from app.common.config import logger, parser
 from concurrent.futures import ThreadPoolExecutor
 from functools import partial
 
@@ -15,48 +17,50 @@ class MaterialLoader:
         self.base_path = Path(base_path)
         self.lazy_load = lazy_load
         self._data_cache = {}
-        self.opt = args.parse_args_and_yaml()
+        self.opt = parser.parse_args_and_yaml()
 
-    def _load_material(self, station_path):
+    def wrapper_path(self, station_id, spec):
+        return f"{self.base_path/station_id/spec}.txt"
+
+    def _load_material(self, station_id):
         """核心数据加载方法"""
-        input_file = self.base_path / station_path
         # 根据您的原始代码逻辑简化的加载流程
         try:
-            basic = pd.read_csv(input_file / self.opt.doc_mapping['basic'], sep=r'\s+', header=0)
-            power = pd.read_csv(input_file / self.opt.doc_mapping['power'], sep=r'\s+', header=0)
+            basic = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['basic']), sep=r'\s+', header=0)
+            power = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['power']), sep=r'\s+', header=0)
             plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
             assert plant_type == 0 or plant_type == 1
             # 根据电站类型加载数据
-            nwp_v = pd.read_csv(input_file / '0' / self.opt.doc_mapping['nwp_v'], sep=r'\s+', header=0)
-            nwp_v_h = pd.read_csv(input_file / '0' / self.opt.doc_mapping['nwp_v_h'], sep=r'\s+', header=0)
-            nwp_own = pd.read_csv(input_file / '1' / self.opt.doc_mapping['nwp_own'], sep=r'\s+', header=0)
-            nwp_own_h = pd.read_csv(input_file / '1' / self.opt.doc_mapping['nwp_own_h'], sep=r'\s+', header=0)
+            nwp_v = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v']}"), sep=r'\s+', header=0)
+            nwp_v_h = pd.read_csv(self.wrapper_path(station_id, f"0/{self.opt.doc_mapping['nwp_v_h']}"), sep=r'\s+', header=0)
+            nwp_own = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own']}"), sep=r'\s+', header=0)
+            nwp_own_h = pd.read_csv(self.wrapper_path(station_id, f"1/{self.opt.doc_mapping['nwp_own_h']}"), sep=r'\s+', header=0)
             if self.opt.switch_nwp_owner:
                 nwp_v, nwp_v_h = nwp_own, nwp_own_h
             # 如果是风电
             if plant_type == 0:
-                station_info = pd.read_csv(input_file / self.opt.doc_mapping['station_info_w'], sep=r'\s+', header=0)
-                station_info_d = pd.read_csv(input_file / self.opt.doc_mapping['station_info_d_w'], sep=r'\s+', header=0)
-                nwp = pd.read_csv(input_file / self.opt.doc_mapping['nwp_w'], sep=r'\s+', header=0)
-                nwp_h = pd.read_csv(input_file / self.opt.doc_mapping['nwp_w_h'], sep=r'\s+', header=0)
+                station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_w']), sep=r'\s+', header=0)
+                station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_w']), sep=r'\s+', header=0)
+                nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w']), sep=r'\s+', header=0)
+                nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_w_h']), sep=r'\s+', header=0)
                 cap = float(station_info.loc[0, 'PlantCap'])
-                if (input_file / self.opt.doc_mapping['env_wf']).exists():
-                    env = pd.read_csv(input_file / self.opt.doc_mapping['env_wf'], sep=r'\s+', header=0)
+                if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf'])).exists():
+                    env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_wf']), sep=r'\s+', header=0)
                 else:
                     env = None
             # 如果是光伏
             else:
-                station_info = pd.read_csv(input_file / self.opt.doc_mapping['station_info_s'], sep=r'\s+', header=0)
-                station_info_d = pd.read_csv(input_file / self.opt.doc_mapping['station_info_d_s'], sep=r'\s+', header=0)
-                nwp = pd.read_csv(input_file / self.opt.doc_mapping['nwp_s'], sep=r'\s+', header=0)
-                nwp_h = pd.read_csv(input_file / self.opt.doc_mapping['nwp_s_h'], sep=r'\s+', header=0)
+                station_info = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_s']), sep=r'\s+', header=0)
+                station_info_d = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['station_info_d_s']), sep=r'\s+', header=0)
+                nwp = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s']), sep=r'\s+', header=0)
+                nwp_h = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['nwp_s_h']), sep=r'\s+', header=0)
                 cap = float(station_info.loc[0, 'PlantCap'])
-                if (input_file / self.opt.doc_mapping['env_sf']).exists():
-                    env = pd.read_csv(input_file / self.opt.doc_mapping['env_sf'], sep=r'\s+', header=0)
+                if Path(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf'])).exists():
+                    env = pd.read_csv(self.wrapper_path(station_id, self.opt.doc_mapping['env_sf']), sep=r'\s+', header=0)
                 else:
                     env = None
 
-            return {
+            return types.SimpleNamespace(**{
                 'station_info': station_info,
                 'station_info_d': station_info_d,
                 'nwp': nwp,
@@ -66,18 +70,18 @@ class MaterialLoader:
                 'nwp_v_h': nwp_v_h,
                 'env': env,
                 'cap': cap
-            }
+            })
         except Exception as e:
-            print(f"Error loading {station_path}: {str(e)}")
+            print(f"Error loading {station_id}: {str(e)}")
             return None
 
-    def get_material(self, station_path):
+    def get_material(self, station_id):
         if self.lazy_load:
-            if station_path not in self._data_cache:
-                self._data_cache[station_path] = self._load_material(station_path)
-            return self._data_cache[station_path]
+            if station_id not in self._data_cache:
+                self._data_cache[station_id] = self._load_material(station_id)
+            return self._data_cache[station_id]
         else:
-            return self._load_material(station_path)
+            return self._load_material(station_id)
 
 
 

+ 9 - 9
app/model/task_worker.py

@@ -12,22 +12,22 @@ from app.model.material import MaterialLoader
 
 
 def station_task(station_id, config):
-    """单个场站训练任务"""
+    """场站训练任务"""
     try:
+        print("111")
         mate = MaterialLoader(base_path=config['input_file'])
         # 动态生成场站数据路径
-        input_file = config['data_template'].format(station_id=station_id)
-
+        print("222")
         # 加载数据
-        data_objects = mate.get_material(input_file)
-
+        data_objects = mate.get_material(station_id)
+        print("333")
         # 数据合并
-        train_data = pd.merge(data_objects.nwp_v_h, data_objects.power, on=config['time_column'])
-
+        train_data = pd.merge(data_objects.nwp_v_h, data_objects.power, on=config['col_time'])
+        print("444")
         # 模型训练
-        model = ModelTrainer(input_file, train_data, capacity=config['capacities'][station_id], gpu_id=config.get('gpu_assignment'))
+        model = ModelTrainer(station_id, train_data, capacity=config['capacities'][station_id], gpu_id=config.get('gpu_assignment'))
         model.train()
-
+        print("555")
         return {'status': 'success', 'station_id': station_id}
     except Exception as e:
         logging.error(f"Station {station_id} failed: {str(e)}")

+ 5 - 11
app/model/tf_model_train.py

@@ -9,17 +9,12 @@ import logging
 import os, json
 import time
 import traceback
-from pathlib import Path
-from copy import deepcopy
 import pandas as pd
 from typing import Dict, Any
 from app.common.tf_lstm import TSHandler
 from app.common.dbmg import MongoUtils
 from app.common.data_handler import DataHandler, write_number_to_file
-from app.common.config import logger
-
-mgUtils = MongoUtils(logger)
-
+from app.common.config import logger, parser
 
 class ModelTrainer:
     """模型训练器封装类"""
@@ -40,12 +35,11 @@ class ModelTrainer:
         self._setup_resources()
 
         # 初始化组件
-        self.logger = logging.getLogger(self.__class__.__name__)
         self.input_file = input_file
-        self.args = args  # 从原始配置导入
-        self.dh = DataHandler(self.logger, self.args)
-        self.ts = TSHandler(self.logger, self.args)
-        self.mgUtils = MongoUtils(self.logger)
+        self.args = parser.parse_args_and_yaml()  # 从原始配置导入
+        self.dh = DataHandler(logger, parser)
+        self.ts = TSHandler(logger, parser)
+        self.mgUtils = MongoUtils(logger)
 
     def _setup_resources(self):
         """GPU资源分配"""

Some files were not shown because too many files changed in this diff