main430.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. import argparse
  2. import os
  3. import uuid
  4. from celery import Celery
  5. import pandas as pd
  6. from pathlib import Path
  7. from app.common.logs import params, logger
  8. """"
  9. 调用思路
  10. xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx
  11. 2. 按照训练和预测加载和解析数据
  12. 3. 对数据进行预处理
  13. 4. 执行训练,保存模型,输出状态
  14. 5. 执行预测,输出结果,输出状态
  15. """
  16. # ------------------- Celery任务配置 -------------------
  17. celery_app = Celery(
  18. 'power_tasks',
  19. broker='redis://redis:6379/0', # 使用Redis作为消息队列
  20. backend='redis://redis:6379/1', # 任务结果存储
  21. task_serializer='pickle',
  22. result_serializer='pickle',
  23. accept_content=['pickle']
  24. )
  25. # 动态控制并发数(最大不超过CPU核数)
  26. celery_app.conf.worker_concurrency = 4
  27. celery_app.conf.worker_prefetch_multiplier = 1 # 防止任务堆积
  28. """"
  29. 调用思路
  30. xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx
  31. 2. 按照训练和预测加载和解析数据
  32. 3. 对数据进行预处理
  33. 4. 执行训练,保存模型,输出状态
  34. 5. 执行预测,输出结果,输出状态
  35. """
  36. def material(input_file, isDq=True):
  37. basi, station_info_w, station_info_d_w, station_info_s, station_info_d_s, nwp_w, nwp_s, nwp_w_h, nwp_s_h, power = (
  38. 'DQYC_IN_BASIC.txt', 'DQYC_IN_PLANT_WIND.txt', 'DQYC_IN_PLANT_DETAIL_WIND.txt', 'DQYC_IN_PLANT_SOLAR.txt',
  39. 'DQYC_IN_PLANT_DETAIL_SOLAR.txt', 'DQYC_IN_FORECAST_WEATHER_WIND.txt', 'DQYC_IN_FORECAST_WEATHER_SOLAR.txt',
  40. 'DQYC_IN_FORECAST_WEATHER_WIND_H.txt', 'DQYC_IN_FORECAST_WEATHER_SOLAR_H.txt', 'DQYC_IN_HISTORY_POWER_LONG.txt')
  41. basi_area = 'DQYC_AREA_IN_BASIC'
  42. nwp_v, nwp_v_h = 'DQYC_IN_FORECAST_WEATHER.txt', 'DQYC_IN_FORECAST_WEATHER_H.txt' # 多版本气象
  43. nwp_own, nwp_own_h = 'DQYC_IN_FORECAST_WEATHER_OWNER.txt', 'DQYC_IN_FORECAST_WEATHER_OWNER_H.txt', # 自有气象
  44. env_wf, env_sf = 'DQYC_IN_ACTUAL_WEATHER_WIND', 'DQYC_IN_ACTUAL_WEATHER_SOLAR' # 实测气象
  45. input_file = Path(input_file)
  46. env_w, env_s = None, None
  47. basic = pd.read_csv(input_file / basi, sep=r'\s+', header=0)
  48. power = pd.read_csv(input_file / power, sep=r'\s+', header=0)
  49. plant_type = int(basic.loc[basic['PropertyID'].to_list().index(('PlantType')), 'Value'])
  50. if isDq:
  51. nwp_v = pd.read_csv(input_file / '0' / nwp_v, sep=r'\s+', header=0)
  52. nwp_v_h = pd.read_csv(input_file / '0' / nwp_v_h, sep=r'\s+', header=0)
  53. nwp_own = pd.read_csv(input_file / '1' / nwp_own, sep=r'\s+', header=0)
  54. nwp_own_h = pd.read_csv(input_file / '1' / nwp_own_h, sep=r'\s+', header=0)
  55. if params['switch_nwp_owner']:
  56. nwp_v, nwp_v_h = nwp_own, nwp_own_h
  57. # 如果是风电
  58. if plant_type == 0:
  59. station_info = pd.read_csv(input_file / station_info_w, sep=r'\s+', header=0)
  60. station_info_d = pd.read_csv(input_file / station_info_d_w, sep=r'\s+', header=0)
  61. nwp = pd.read_csv(input_file / nwp_w, sep=r'\s+', header=0)
  62. nwp_h = pd.read_csv(input_file / nwp_w_h, sep=r'\s+', header=0)
  63. if (input_file / env_wf).exists():
  64. env_w = pd.read_csv(input_file / env_wf, sep=r'\s+', header=0)
  65. return station_info, station_info_d, nwp, nwp_h, power, nwp_v, nwp_v_h, env_w
  66. # 如果是光伏
  67. elif plant_type == 1:
  68. station_info = pd.read_csv(input_file / station_info_s, sep=r'\s+', header=0)
  69. station_info_d = pd.read_csv(input_file / station_info_d_s, sep=r'\s+', header=0)
  70. nwp = pd.read_csv(input_file / nwp_s, sep=r'\s+', header=0)
  71. nwp_h = pd.read_csv(input_file / nwp_s_h, sep=r'\s+', header=0)
  72. if (input_file / env_sf).exists():
  73. env_s = pd.read_csv(input_file / env_sf, sep=r'\s+', header=0)
  74. return station_info, station_info_d, nwp, nwp_h, power, nwp_v, nwp_v_h, env_s
  75. else:
  76. # 区域级预测待定,可能需要遍历获取场站数据
  77. basic_area = pd.read_csv(input_file / basi_area, sep=r'\s+', header=0)
  78. return basic_area
  79. def clean_power(power, env, plant_id):
  80. env_power = pd.merge(env, power, on=params['col_time'])
  81. if 'HubSpeed' in env.columns.tolist():
  82. from app.common.limited_power_wind import LimitPower
  83. lp = LimitPower(logger, params, env_power)
  84. power = lp.clean_limited_power(plant_id, True)
  85. elif 'Irradiance' in env.columns.tolist():
  86. from app.common.limited_power_solar import LimitPower
  87. lp = LimitPower(logger, params, env_power)
  88. power = lp.clean_limited_power(plant_id, True)
  89. return power
  90. def main():
  91. """命令行入口(批量提交任务到队列)"""
  92. parser = argparse.ArgumentParser(description="功率预测程序")
  93. parser.add_argument("input_file", help="输入文件路径")
  94. parser.add_argument("--model_name", default="lstm", help="选择短期模型")
  95. args = parser.parse_args()
  96. task_id = str(uuid.uuid4())
  97. async_input_handler.delay(task_id, args.input_file, args.model_name)
  98. print(f"训练任务已提交 | ID: {task_id} | 查看状态: /api/task/{task_id}")
  99. if __name__ == "__main__":
  100. main()