123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/3/2 10:28
- # file: config.py
- # author: David
- # company: shenyang JY
- """
- 模型调参及系统功能配置
- """
- import concurrent.futures
- import types
- from pyexpat import features
- from tensorflow import add_n
- from tqdm import tqdm
- import pandas as pd
- from pathlib import Path
- from copy import deepcopy
- from concurrent.futures import ProcessPoolExecutor
- from app.common.config import parser, logger
- from app.model.resource_manager import ResourceController
- from app.model.task_worker import Task
- from app.model.material import MaterialLoader
- from multiprocessing import Manager, Lock
- """"
- 调用思路
- xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx
- 2. 按照训练和预测加载和解析数据
- 3. 对数据进行预处理
- 4. 执行训练,保存模型,输出状态
- 5. 执行预测,输出结果,输出状态
- """
- """
- 训练任务
- 1.将一个省份下的所有场站加入队列
- 2.队列中的每个场站是一个子任务,还有最终的区域级子任务
- """
- def add_nwp(df_obj, df):
- if df_obj.empty:
- df_obj = df
- else:
- add_cols = [col for col in df_obj.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
- df_obj[add_cols] = df_obj[add_cols].add(df, fill_value=0)
- return df_obj
- def main():
- # ---------------------------- 解析参数 ----------------------------
- # 解析参数,将固定参数和任务参数合并
- opt = parser.parse_args_and_yaml()
- config = opt.__dict__
- # 打印参数
- logger.info(f"输入文件目录: {opt.input_file}")
- # ---------------------------- 配置计算资源和任务 ----------------------------
- # 初始化资源管理器
- rc = ResourceController(
- max_workers=opt.system['max_workers'],
- gpu_list=opt.system['gpu_devices']
- )
- # 生成任务列表
- all_stations = [str(child.parts[-1]) for child in Path(opt.input_file).iterdir() if child.is_dir()]
- loader = MaterialLoader(opt.input_file)
- task = Task(loader)
- # ---------------------------- 监控任务,进度跟踪 ----------------------------
- # 场站级功率预测训练
- completed = 0
- with tqdm(total=len(all_stations)) as pbar:
- with ProcessPoolExecutor(max_workers=rc.cpu_cores) as executor:
- futures = []
- for sid in all_stations:
- # 动态分配GPU
- task_config = deepcopy(config)
- gpu_id = rc.get_gpu()
- task_config['gpu_assignment'] = gpu_id
- task_config['station_id'] = sid
- # 提交任务
- future = executor.submit(task.station_task, task_config)
- future.add_done_callback(
- lambda _: rc.release_gpu(task_config['gpu_assignment']))
- futures.append(future)
- total_cap = 0
- weighted_nwp = pd.DataFrame()
- weighted_nwp_h = pd.DataFrame()
- weighted_nwp_v = pd.DataFrame()
- weighted_nwp_v_h = pd.DataFrame()
- # 处理完成情况
- for future in concurrent.futures.as_completed(futures):
- try:
- result = future.result()
- if result['status'] == 'success':
- # 分治-汇总策略得到加权后的nwp
- completed += 1
- local = result['weights']
- total_cap += local['cap']
- weighted_nwp = add_nwp(weighted_nwp, local['nwp'])
- weighted_nwp_h = add_nwp(weighted_nwp_h, local['nwp_h'])
- weighted_nwp_v = add_nwp(weighted_nwp_v, local['nwp_v'])
- weighted_nwp_v_h = add_nwp(weighted_nwp_v_h, local['nwp_v_h'])
- pbar.update(1)
- pbar.set_postfix_str(f"Completed: {completed}/{len(all_stations)}")
- except Exception as e:
- print(f"Task failed: {e}")
- # 归一化处理
- use_cols = [col for col in weighted_nwp.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
- use_cols_v = [col for col in weighted_nwp_v.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
- weighted_nwp[use_cols] /= total_cap
- weighted_nwp_h[use_cols] /= total_cap
- weighted_nwp[use_cols] = weighted_nwp[use_cols].round(2)
- weighted_nwp_h[use_cols] = weighted_nwp_h[use_cols].round(2)
- weighted_nwp_v[use_cols_v] /= total_cap
- weighted_nwp_v_h[use_cols_v] /= total_cap
- weighted_nwp_v[use_cols_v] = weighted_nwp_v[use_cols_v].round(2)
- weighted_nwp_v_h[use_cols_v] = weighted_nwp_v_h[use_cols_v].round(2)
- data_nwps = types.SimpleNamespace(**{'nwp': weighted_nwp, 'nwp_h': weighted_nwp_h, 'nwp_v': weighted_nwp_v, 'nwp_v_h': weighted_nwp_v_h, 'total_cap': total_cap})
- print(f"Final result: {completed} stations trained successfully")
- # 区域级功率预测训练
- task_config = deepcopy(config)
- gpu_id = rc.get_gpu()
- task_config['gpu_assignment'] = gpu_id
- task.region_task(task_config, data_nwps)
- if __name__ == "__main__":
- main()
|