123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- import pandas as pd
- from flask import Flask, request, jsonify
- import time
- import logging
- import traceback
- from common.database_dml import get_data_from_mongo, insert_data_into_mongo
- app = Flask('post_processing——service')
- """
- id = "${id}"
- cap = ${cap}
- 参数
- {
- 'mongodb_database': 'hzh_ftp',
- 'mongodb_read_table': f'{id}_PRED',
- 'mongodb_write_table': f'{id}_PRED',
- 'col_time': "dateTime",
- 'smooth_window': 3
- 'plant_type': 'solar',
- 'mongodb_nwp_table': f'{id}_NWP_D1'
- }
- """
- def get_data(args):
- df = get_data_from_mongo(args)
- col_time = args['col_time']
- if not df.empty:
- print("预测数据加载成功!")
- df[col_time] = pd.to_datetime(df[col_time])
- df.set_index(col_time, inplace=True)
- df.sort_index(inplace=True)
- else:
- raise ValueError("未获取到预测数据。")
- return df
- def predict_result_adjustment(df, args):
- """
- 光伏/风电 数据后处理 主要操作
- 1. 光伏 (夜间 置零 + 平滑)
- 2. 风电 (平滑)
- 3. cap 封顶
- """
- mongodb_database, plant_type, cap, col_time = args['mongodb_database'], args['plant_type'], float(args['cap']), \
- args['col_time']
- if 'smooth_window' in args.keys():
- smooth_window = int(args['smooth_window'])
- else:
- smooth_window = 3
- # 平滑
- df_cp = df.copy()
- df_cp['power_forecast'] = df_cp['power_forecast'].rolling(window=smooth_window, min_periods=1,
- center=True).mean().clip(0, 0.985 * cap)
- print("smooth processed")
- # 光伏晚上置零
- if plant_type == 'solar' and 'mongodb_nwp_table' in args.keys():
- nwp_param = {
- 'mongodb_database': mongodb_database,
- 'mongodb_read_table': args['mongodb_nwp_table'],
- 'col_time': col_time
- }
- nwp = get_data(nwp_param)
- df_cp = df_cp.join(nwp['radiation'])
- df_cp.loc[nwp['radiation'] == 0, 'power_forecast'] = 0
- df_cp['power_forecast'] = round(df_cp['power_forecast'], 2)
- df_cp.drop(columns=['radiation'], inplace=True)
- print("solar processed")
- df_cp.reset_index(inplace=True)
- df_cp[col_time] = df_cp[col_time].dt.strftime('%Y-%m-%d %H:%M:%S')
- return df_cp
- @app.route('/post_process', methods=['POST'])
- def data_join():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- print('args', args)
- logger.info(args)
- df_pre = get_data(args)
- res_df = predict_result_adjustment(df_pre, args)
- insert_data_into_mongo(res_df, args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- print(my_exception)
- my_exception.replace("\n", "\t")
- result['msg'] = my_exception
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__ == "__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("post_processing")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10098)
- print("server start!")
|