import pandas as pd from flask import Flask, request, jsonify, g import time import logging import traceback from io import StringIO from common.database_dml import get_data_from_mongo, insert_data_into_mongo from common.log_utils import init_request_logging, teardown_request_logging app = Flask('post_process——service') # 请求前设置日志捕获 @app.before_request def setup_logging(): init_request_logging(logger) # 请求后清理日志处理器 @app.after_request def teardown_logging(response): return teardown_request_logging(response, logger) def get_data(args): df = get_data_from_mongo(args) col_time = args['col_time'] if not df.empty: logger.info(f"{args['mongodb_read_table']} load success") df[col_time] = pd.to_datetime(df[col_time]) df.set_index(col_time, inplace=True) df.sort_index(inplace=True) else: raise ValueError("未获取到预测数据。") return df def predict_result_adjustment(df, args): """ 光伏/风电 数据后处理 主要操作 1. 光伏 (夜间 置零 + 平滑) 2. 风电 (平滑) 3. cap 封顶 """ mongodb_database, plant_type, cap, col_time = args['mongodb_database'], args['plant_type'], float(args['cap']), \ args['col_time'] if 'smooth_window' in args.keys(): smooth_window = int(args['smooth_window']) else: smooth_window = 3 # 平滑 df_cp = df.copy() df_cp['power_forecast'] = df_cp['power_forecast'].rolling(window=smooth_window, min_periods=1, center=True).mean().clip(0, 0.985 * cap) logger.info(f"smooth processed windows: {smooth_window}") # 光伏晚上置零 if plant_type == 'solar' and 'mongodb_nwp_table' in args.keys(): nwp_param = { 'mongodb_database': mongodb_database, 'mongodb_read_table': args['mongodb_nwp_table'], 'col_time': col_time } col_radiation = args['radiation'] nwp = get_data(nwp_param) df_cp = df_cp.join(nwp[col_radiation]) df_cp.loc[nwp[col_radiation] == 0, 'power_forecast'] = 0 df_cp.drop(columns=['radiation'], inplace=True) logger.info("solar processed") df_cp['power_forecast'] = round(df_cp['power_forecast'], 2) df_cp.reset_index(inplace=True) df_cp[col_time] = df_cp[col_time].dt.strftime('%Y-%m-%d %H:%M:%S') return df_cp @app.route('/post_process', methods=['POST']) def data_join(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args', args) logger.info(args) df_pre = get_data(args) res_df = predict_result_adjustment(df_pre, args) insert_data_into_mongo(res_df, args) success = 1 except Exception as e: my_exception = traceback.format_exc() logger.error(my_exception) end_time = time.time() result['success'] = success result['args'] = args result['log'] = g.log_stream.getvalue().splitlines() result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__ == "__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("post_process") from waitress import serve serve(app, host="0.0.0.0", port=10130) print("server start!")