main.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/2 10:28
  4. # file: config.py
  5. # author: David
  6. # company: shenyang JY
  7. """
  8. 模型调参及系统功能配置
  9. """
  10. import concurrent.futures
  11. import os.path
  12. import types
  13. from tqdm import tqdm
  14. import pandas as pd
  15. from pathlib import Path
  16. from copy import deepcopy
  17. from concurrent.futures import ProcessPoolExecutor
  18. from app.common.config import parser, logger
  19. from app.model.resource_manager import ResourceController
  20. from app.model.task_worker import TaskTrain, TaskPre
  21. from app.model.material import MaterialLoader
  22. from multiprocessing import Manager, Lock
  23. """"
  24. 调用思路
  25. xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx
  26. 2. 按照训练和预测加载和解析数据
  27. 3. 对数据进行预处理
  28. 4. 执行训练,保存模型,输出状态
  29. 5. 执行预测,输出结果,输出状态
  30. """
  31. """
  32. 训练任务
  33. 1.将一个省份下的所有场站加入队列
  34. 2.队列中的每个场站是一个子任务,还有最终的区域级子任务
  35. """
  36. def add_nwp(df_obj, df):
  37. if df_obj.empty:
  38. df_obj = df
  39. else:
  40. add_cols = [col for col in df_obj.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  41. df_obj[add_cols] = df_obj[add_cols].add(df, fill_value=0)
  42. return df_obj
  43. def dq_train(opt):
  44. # ---------------------------- 配置计算资源和任务 ----------------------------
  45. config = opt.__dict__
  46. # 初始化资源管理器
  47. rc = ResourceController(
  48. max_workers=opt.system['max_workers'],
  49. gpu_list=opt.system['gpu_devices']
  50. )
  51. # 生成任务列表
  52. target_dir = os.path.join(opt.dqyc_base_path, opt.input_file)
  53. all_stations = [str(child.parts[-1]) for child in Path(str(target_dir)).iterdir() if child.is_dir()]
  54. loader = MaterialLoader(target_dir)
  55. task = TaskTrain(loader)
  56. # ---------------------------- 监控任务,进度跟踪 ----------------------------
  57. # 场站级功率预测训练
  58. completed = 0
  59. with tqdm(total=len(all_stations)) as pbar:
  60. with ProcessPoolExecutor(max_workers=rc.cpu_cores) as executor:
  61. futures = []
  62. for sid in all_stations:
  63. # 动态分配GPU
  64. task_config = deepcopy(config)
  65. gpu_id = rc.get_gpu()
  66. task_config['gpu_assignment'] = gpu_id
  67. task_config['station_id'] = sid
  68. # 提交任务
  69. future = executor.submit(task.station_task, task_config)
  70. future.add_done_callback(
  71. lambda _: rc.release_gpu(task_config['gpu_assignment']))
  72. futures.append(future)
  73. total_cap = 0
  74. weighted_nwp = pd.DataFrame()
  75. weighted_nwp_h = pd.DataFrame()
  76. weighted_nwp_v = pd.DataFrame()
  77. weighted_nwp_v_h = pd.DataFrame()
  78. # 处理完成情况
  79. for future in concurrent.futures.as_completed(futures):
  80. try:
  81. result = future.result()
  82. if result['status'] == 'success':
  83. # 分治-汇总策略得到加权后的nwp
  84. completed += 1
  85. local = result['weights']
  86. total_cap += local['cap']
  87. weighted_nwp = add_nwp(weighted_nwp, local['nwp'])
  88. weighted_nwp_h = add_nwp(weighted_nwp_h, local['nwp_h'])
  89. weighted_nwp_v = add_nwp(weighted_nwp_v, local['nwp_v'])
  90. weighted_nwp_v_h = add_nwp(weighted_nwp_v_h, local['nwp_v_h'])
  91. pbar.update(1)
  92. pbar.set_postfix_str(f"Completed: {completed}/{len(all_stations)}")
  93. except Exception as e:
  94. print(f"Task failed: {e}")
  95. # 归一化处理
  96. use_cols = [col for col in weighted_nwp.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  97. use_cols_v = [col for col in weighted_nwp_v.columns if col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  98. weighted_nwp[use_cols] /= total_cap
  99. weighted_nwp_h[use_cols] /= total_cap
  100. weighted_nwp[use_cols] = weighted_nwp[use_cols].round(2)
  101. weighted_nwp_h[use_cols] = weighted_nwp_h[use_cols].round(2)
  102. weighted_nwp_v[use_cols_v] /= total_cap
  103. weighted_nwp_v_h[use_cols_v] /= total_cap
  104. weighted_nwp_v[use_cols_v] = weighted_nwp_v[use_cols_v].round(2)
  105. weighted_nwp_v_h[use_cols_v] = weighted_nwp_v_h[use_cols_v].round(2)
  106. data_nwps = types.SimpleNamespace(**{'nwp': weighted_nwp, 'nwp_h': weighted_nwp_h, 'nwp_v': weighted_nwp_v, 'nwp_v_h': weighted_nwp_v_h, 'total_cap': total_cap})
  107. print(f"Final result: {completed} stations trained successfully")
  108. # 区域级功率预测训练
  109. task_config = deepcopy(config)
  110. gpu_id = rc.get_gpu()
  111. task_config['gpu_assignment'] = gpu_id
  112. task.region_task(task_config, data_nwps)
  113. def dq_predict(opt):
  114. # ---------------------------- 配置计算资源和任务 ----------------------------
  115. config = opt.__dict__
  116. # 初始化资源管理器
  117. rc = ResourceController(
  118. max_workers=opt.system['max_workers'],
  119. gpu_list=opt.system['gpu_devices']
  120. )
  121. # 生成任务列表
  122. target_dir = os.path.join(opt.dqyc_base_path, opt.input_file)
  123. all_stations = [str(child.parts[-1]) for child in Path(str(target_dir)).iterdir() if child.is_dir()]
  124. loader = MaterialLoader(target_dir)
  125. task = TaskPre(loader)
  126. # ---------------------------- 监控任务,进度跟踪 ----------------------------
  127. # 场站级功率预测训练
  128. completed = 0
  129. with tqdm(total=len(all_stations)) as pbar:
  130. with ProcessPoolExecutor(max_workers=rc.cpu_cores) as executor:
  131. futures = []
  132. for sid in all_stations:
  133. # 动态分配GPU
  134. task_config = deepcopy(config)
  135. gpu_id = rc.get_gpu()
  136. task_config['gpu_assignment'] = gpu_id
  137. task_config['station_id'] = sid
  138. # 提交任务
  139. future = executor.submit(task.station_task, task_config)
  140. future.add_done_callback(
  141. lambda _: rc.release_gpu(task_config['gpu_assignment']))
  142. futures.append(future)
  143. total_cap = 0
  144. weighted_nwp = pd.DataFrame()
  145. weighted_nwp_h = pd.DataFrame()
  146. weighted_nwp_v = pd.DataFrame()
  147. weighted_nwp_v_h = pd.DataFrame()
  148. # 处理完成情况
  149. for future in concurrent.futures.as_completed(futures):
  150. try:
  151. result = future.result()
  152. if result['status'] == 'success':
  153. # 分治-汇总策略得到加权后的nwp
  154. completed += 1
  155. local = result['weights']
  156. total_cap += local['cap']
  157. weighted_nwp = add_nwp(weighted_nwp, local['nwp'])
  158. weighted_nwp_h = add_nwp(weighted_nwp_h, local['nwp_h'])
  159. weighted_nwp_v = add_nwp(weighted_nwp_v, local['nwp_v'])
  160. weighted_nwp_v_h = add_nwp(weighted_nwp_v_h, local['nwp_v_h'])
  161. pbar.update(1)
  162. pbar.set_postfix_str(f"Completed: {completed}/{len(all_stations)}")
  163. except Exception as e:
  164. print(f"Task failed: {e}")
  165. # 归一化处理
  166. use_cols = [col for col in weighted_nwp.columns if
  167. col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  168. use_cols_v = [col for col in weighted_nwp_v.columns if
  169. col not in ['PlantID', 'PlantName', 'PlantType', 'Qbsj', 'Datetime']]
  170. weighted_nwp[use_cols] /= total_cap
  171. weighted_nwp_h[use_cols] /= total_cap
  172. weighted_nwp[use_cols] = weighted_nwp[use_cols].round(2)
  173. weighted_nwp_h[use_cols] = weighted_nwp_h[use_cols].round(2)
  174. weighted_nwp_v[use_cols_v] /= total_cap
  175. weighted_nwp_v_h[use_cols_v] /= total_cap
  176. weighted_nwp_v[use_cols_v] = weighted_nwp_v[use_cols_v].round(2)
  177. weighted_nwp_v_h[use_cols_v] = weighted_nwp_v_h[use_cols_v].round(2)
  178. data_nwps = types.SimpleNamespace(
  179. **{'nwp': weighted_nwp, 'nwp_h': weighted_nwp_h, 'nwp_v': weighted_nwp_v, 'nwp_v_h': weighted_nwp_v_h,
  180. 'total_cap': total_cap})
  181. print(f"Final result: {completed} stations trained successfully")
  182. # 区域级功率预测训练
  183. task_config = deepcopy(config)
  184. gpu_id = rc.get_gpu()
  185. task_config['gpu_assignment'] = gpu_id
  186. task.region_task(task_config, data_nwps)
  187. def cdq_train(opt):
  188. pass
  189. def cdq_predict(opt):
  190. pass
  191. def main():
  192. # ---------------------------- 解析参数 ----------------------------
  193. # 解析参数,将固定参数和任务参数合并
  194. opt = parser.parse_args_and_yaml()
  195. # 打印参数
  196. logger.info(f"输入文件目录: {opt.input_file}")
  197. is_dq = opt.input_file.split('/')
  198. if len(is_dq) == 4: # 根据input_file第一个位置参数判断训练还是预测
  199. if opt.train_mode:
  200. dq_train(opt)
  201. else:
  202. dq_predict(opt)
  203. else:
  204. if opt.train_mode:
  205. cdq_train(opt)
  206. else:
  207. cdq_predict(opt)
  208. if __name__ == "__main__":
  209. main()