123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113 |
- import argparse
- import os
- import uuid
- from celery import Celery
- import pandas as pd
- from pathlib import Path
- from app.common.logs import params, logger
- """"
- 调用思路
- xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx
- 2. 按照训练和预测加载和解析数据
- 3. 对数据进行预处理
- 4. 执行训练,保存模型,输出状态
- 5. 执行预测,输出结果,输出状态
- """
- # ------------------- Celery任务配置 -------------------
- celery_app = Celery(
- 'power_tasks',
- broker='redis://redis:6379/0', # 使用Redis作为消息队列
- backend='redis://redis:6379/1', # 任务结果存储
- task_serializer='pickle',
- result_serializer='pickle',
- accept_content=['pickle']
- )
- # 动态控制并发数(最大不超过CPU核数)
- celery_app.conf.worker_concurrency = 4
- celery_app.conf.worker_prefetch_multiplier = 1 # 防止任务堆积
- """"
- 调用思路
- xxxx 1. 从入口参数中获取IN OUT文件位置 xxxx
- 2. 按照训练和预测加载和解析数据
- 3. 对数据进行预处理
- 4. 执行训练,保存模型,输出状态
- 5. 执行预测,输出结果,输出状态
- """
- def material(input_file, isDq=True):
- basi, station_info_w, station_info_d_w, station_info_s, station_info_d_s, nwp_w, nwp_s, nwp_w_h, nwp_s_h, power = (
- 'DQYC_IN_BASIC.txt', 'DQYC_IN_PLANT_WIND.txt', 'DQYC_IN_PLANT_DETAIL_WIND.txt', 'DQYC_IN_PLANT_SOLAR.txt',
- 'DQYC_IN_PLANT_DETAIL_SOLAR.txt', 'DQYC_IN_FORECAST_WEATHER_WIND.txt', 'DQYC_IN_FORECAST_WEATHER_SOLAR.txt',
- 'DQYC_IN_FORECAST_WEATHER_WIND_H.txt', 'DQYC_IN_FORECAST_WEATHER_SOLAR_H.txt', 'DQYC_IN_HISTORY_POWER_LONG.txt')
- basi_area = 'DQYC_AREA_IN_BASIC'
- nwp_v, nwp_v_h = 'DQYC_IN_FORECAST_WEATHER.txt', 'DQYC_IN_FORECAST_WEATHER_H.txt' # 多版本气象
- nwp_own, nwp_own_h = 'DQYC_IN_FORECAST_WEATHER_OWNER.txt', 'DQYC_IN_FORECAST_WEATHER_OWNER_H.txt', # 自有气象
- env_wf, env_sf = 'DQYC_IN_ACTUAL_WEATHER_WIND', 'DQYC_IN_ACTUAL_WEATHER_SOLAR' # 实测气象
- input_file = Path(input_file)
- env_w, env_s = None, None
- basic = pd.read_csv(input_file / basi, sep=r'\s+', header=0)
- power = pd.read_csv(input_file / power, sep=r'\s+', header=0)
- plant_type = int(basic.loc[basic['PropertyID'].to_list().index(('PlantType')), 'Value'])
- if isDq:
- nwp_v = pd.read_csv(input_file / '0' / nwp_v, sep=r'\s+', header=0)
- nwp_v_h = pd.read_csv(input_file / '0' / nwp_v_h, sep=r'\s+', header=0)
- nwp_own = pd.read_csv(input_file / '1' / nwp_own, sep=r'\s+', header=0)
- nwp_own_h = pd.read_csv(input_file / '1' / nwp_own_h, sep=r'\s+', header=0)
- if params['switch_nwp_owner']:
- nwp_v, nwp_v_h = nwp_own, nwp_own_h
- # 如果是风电
- if plant_type == 0:
- station_info = pd.read_csv(input_file / station_info_w, sep=r'\s+', header=0)
- station_info_d = pd.read_csv(input_file / station_info_d_w, sep=r'\s+', header=0)
- nwp = pd.read_csv(input_file / nwp_w, sep=r'\s+', header=0)
- nwp_h = pd.read_csv(input_file / nwp_w_h, sep=r'\s+', header=0)
- if (input_file / env_wf).exists():
- env_w = pd.read_csv(input_file / env_wf, sep=r'\s+', header=0)
- return station_info, station_info_d, nwp, nwp_h, power, nwp_v, nwp_v_h, env_w
- # 如果是光伏
- elif plant_type == 1:
- station_info = pd.read_csv(input_file / station_info_s, sep=r'\s+', header=0)
- station_info_d = pd.read_csv(input_file / station_info_d_s, sep=r'\s+', header=0)
- nwp = pd.read_csv(input_file / nwp_s, sep=r'\s+', header=0)
- nwp_h = pd.read_csv(input_file / nwp_s_h, sep=r'\s+', header=0)
- if (input_file / env_sf).exists():
- env_s = pd.read_csv(input_file / env_sf, sep=r'\s+', header=0)
- return station_info, station_info_d, nwp, nwp_h, power, nwp_v, nwp_v_h, env_s
- else:
- # 区域级预测待定,可能需要遍历获取场站数据
- basic_area = pd.read_csv(input_file / basi_area, sep=r'\s+', header=0)
- return basic_area
- def clean_power(power, env, plant_id):
- env_power = pd.merge(env, power, on=params['col_time'])
- if 'HubSpeed' in env.columns.tolist():
- from app.common.limited_power_wind import LimitPower
- lp = LimitPower(logger, params, env_power)
- power = lp.clean_limited_power(plant_id, True)
- elif 'Irradiance' in env.columns.tolist():
- from app.common.limited_power_solar import LimitPower
- lp = LimitPower(logger, params, env_power)
- power = lp.clean_limited_power(plant_id, True)
- return power
- def main():
- """命令行入口(批量提交任务到队列)"""
- parser = argparse.ArgumentParser(description="功率预测程序")
- parser.add_argument("input_file", help="输入文件路径")
- parser.add_argument("--model_name", default="lstm", help="选择短期模型")
- args = parser.parse_args()
- task_id = str(uuid.uuid4())
- async_input_handler.delay(task_id, args.input_file, args.model_name)
- print(f"训练任务已提交 | ID: {task_id} | 查看状态: /api/task/{task_id}")
- if __name__ == "__main__":
- main()
|