David 2 天之前
父節點
當前提交
209bafbf50
共有 4 個文件被更改,包括 71 次插入4 次删除
  1. 3 0
      app/common/config.yml
  2. 1 1
      app/model/main.py
  3. 64 0
      app/model/material.py
  4. 3 3
      app/model/task_worker.py

+ 3 - 0
app/common/config.yml

@@ -15,6 +15,9 @@ doc_mapping:
   nwp_own_h: DQYC_IN_FORECAST_WEATHER_H  # 自有气象
   env_wf: DQYC_IN_ACTUAL_WEATHER_WIND
   env_sf: DQYC_IN_ACTUAL_WEATHER_SOLAR'  # 实测气象
+doc_area_mapping:
+  basic: DQYC_AREA_IN_BASIC
+  power: DQYC_AREA_IN_HISTORY_POWER_LONG
 
 switch_nwp_owner: false
 system:

+ 1 - 1
app/model/main.py

@@ -78,7 +78,7 @@ def main():
 
     print(f"Final result: {completed} stations trained successfully")
     # 区域级功率预测训练
-    region_task(all_stations, config)
+    region_task(config)
 
 if __name__ == "__main__":
     main()

+ 64 - 0
app/model/material.py

@@ -4,6 +4,7 @@
 # @Time      :2025/4/29 11:07
 # @Author    :David
 # @Company: shenyang JY
+import os.path
 import types
 import pandas as pd
 from pathlib import Path
@@ -18,6 +19,11 @@ class MaterialLoader:
         self.lazy_load = lazy_load
         self._data_cache = {}
         self.opt = parser.parse_args_and_yaml()
+        self.sum_cap = 0
+        self.weighted_nwp = pd.DataFrame()
+        self.weighted_nwp_h = pd.DataFrame()
+        self.weighted_nwp_v = pd.DataFrame()
+        self.weighted_nwp_v_h = pd.DataFrame()
 
     def wrapper_path(self, station_id, spec):
         return f"{self.base_path/station_id/spec}.txt"
@@ -79,11 +85,69 @@ class MaterialLoader:
         if self.lazy_load:
             if station_id not in self._data_cache:
                 self._data_cache[station_id] = self._load_material(station_id)
+                self.add_weights(self._data_cache[station_id])
             return self._data_cache[station_id]
         else:
             return self._load_material(station_id)
 
+    def add_weights(self, data_objects):
+        """对nwp数据进行cap加权(nwp, nwp_h, nwp_v_, nwp_v_h)"""
 
+        def sum_df(df_obj, df, weight):
+            """内部函数:对DataFrame进行加权求和"""
+            columns_to_scale = [col for col in df.columns if col not in ['PlantID', 'PlantName', 'Datetime']]
+
+            if not df_obj.empty:
+                # 验证列名一致性
+                assert set(df_obj.columns) == set(df.columns), "DataFrame列不匹配"
+                # 向量化操作:仅对数值列进行加权累加
+                df_obj[columns_to_scale] += df[columns_to_scale] * weight
+            else:
+                # 初始化操作:复制结构并加权数值列
+                df_obj = df.copy()
+                df_obj[columns_to_scale] = df[columns_to_scale] * weight
+            return df_obj
+
+        # 从data_objects解构对象
+        nwp, nwp_h, nwp_v, nwp_v_h, power, cap = (
+            data_objects.nwp,
+            data_objects.nwp_h,
+            data_objects.nwp_v,
+            data_objects.nwp_v_h,
+            data_objects.power,
+            data_objects.cap
+        )
+
+        # 累加总容量(用于后续归一化)
+        self.sum_cap += cap
+
+        # 对每个NWP数据集进行容量加权
+        self.weighted_nwp = sum_df(self.weighted_nwp, nwp, cap)
+        self.weighted_nwp_h = sum_df(self.weighted_nwp_h, nwp_h, cap)
+        self.weighted_nwp_v = sum_df(self.weighted_nwp_v, nwp_v, cap)
+        self.weighted_nwp_v_h = sum_df(self.weighted_nwp_v_h, nwp_v_h, cap)
+
+    def get_material_region(self):
+        try:
+            basic = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['basic']), sep=r'\s+', header=0)
+            power = pd.read_csv(os.path.join(self.base_path, self.opt.doc_area_mapping['power']), sep=r'\s+', header=0)
+            plant_type = int(basic.loc[basic['PropertyID'].tolist().index('PlantType'), 'Value'])
+            area_id = int(basic.loc[basic['PropertyID'].tolist().index('AreaId'), 'Value'])
+            assert plant_type == 0 or plant_type == 1
+            area_cap = float(basic.loc[basic['PropertyID'].tolist().index('AreaCap'), 'Value'])
+            columns_to_scale = [col for col in self.weighted_nwp.columns if col not in ['PlantID', 'PlantName', 'Datetime']]
+            self.weighted_nwp[columns_to_scale] /= self.sum_cap
+            return types.SimpleNamespace(**{
+                'nwp': self.weighted_nwp,
+                'nwp_h': self.weighted_nwp_h,
+                'power': power,
+                'nwp_v': self.weighted_nwp_v,
+                'nwp_v_h': self.weighted_nwp_v_h,
+                'area_cap': area_cap
+            })
+        except Exception as e:
+            print(f"Region Error loading: {str(e)}")
+            return None
 
 if __name__ == "__main__":
     run_code = 0

+ 3 - 3
app/model/task_worker.py

@@ -8,6 +8,7 @@
 import logging
 import pandas as pd
 from app.model.tf_model_train import ModelTrainer
+from app.model.tf_region_train import RegionTrainer
 from app.model.material import MaterialLoader
 
 
@@ -36,7 +37,7 @@ def station_task(config):
         return {'status': 'failed', 'station_id': station_id}
 
 
-def region_task(all_stations, config):
+def region_task(config):
     """区域级训练任务"""
     try:
         print("111")
@@ -45,13 +46,12 @@ def region_task(all_stations, config):
         # 动态生成场站数据路径
         print("222")
         # 加载数据
-        data_objects = mate.get_material(station_id)
+        data_objects = mate.get_material_region()
         print("333")
         # 数据合并
         train_data = pd.merge(data_objects.nwp_v_h, data_objects.power, on=config['col_time'])
         print("444")
         # 模型训练
-        # model = ModelTrainer(station_id, train_data, capacity=data_objects.cap, gpu_id=config.get('gpu_assignment'))
         model = ModelTrainer(train_data, capacity=data_objects.cap, config=config)
         model.train()
         print("555")