post_process.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. import pandas as pd
  2. from flask import Flask, request, jsonify, g
  3. import time
  4. import logging
  5. import traceback
  6. from io import StringIO
  7. from common.database_dml import get_data_from_mongo, insert_data_into_mongo
  8. from common.log_utils import init_request_logging, teardown_request_logging
  9. app = Flask('post_process——service')
  10. # 请求前设置日志捕获
  11. @app.before_request
  12. def setup_logging():
  13. init_request_logging(logger)
  14. # 请求后清理日志处理器
  15. @app.after_request
  16. def teardown_logging(response):
  17. return teardown_request_logging(response, logger)
  18. def get_data(args):
  19. df = get_data_from_mongo(args)
  20. col_time = args['col_time']
  21. if not df.empty:
  22. logger.info(f"{args['mongodb_read_table']} load success")
  23. df[col_time] = pd.to_datetime(df[col_time])
  24. df.set_index(col_time, inplace=True)
  25. df.sort_index(inplace=True)
  26. else:
  27. raise ValueError("未获取到预测数据。")
  28. return df
  29. def predict_result_adjustment(df, args, col_radiation="radiation"):
  30. """
  31. 光伏/风电 数据后处理 主要操作
  32. 1. 光伏 (夜间 置零 + 平滑)
  33. 2. 风电 (平滑)
  34. 3. cap 封顶
  35. """
  36. mongodb_database, plant_type, cap, col_time = args['mongodb_database'], args['plant_type'], float(args['cap']), \
  37. args['col_time']
  38. if 'smooth_window' in args.keys():
  39. smooth_window = int(args['smooth_window'])
  40. else:
  41. smooth_window = 3
  42. # 平滑
  43. df_cp = df.copy()
  44. df_cp['power_forecast'] = df_cp['power_forecast'].rolling(window=smooth_window, min_periods=1,
  45. center=True).mean().clip(0, 0.985 * cap)
  46. logger.info(f"smooth processed windows: {smooth_window}")
  47. # 光伏晚上置零
  48. if plant_type == 'solar' and 'mongodb_nwp_table' in args.keys():
  49. nwp_param = {
  50. 'mongodb_database': mongodb_database,
  51. 'mongodb_read_table': args['mongodb_nwp_table'],
  52. 'col_time': col_time
  53. }
  54. col_radiation = args['col_radiation']
  55. nwp = get_data(nwp_param)
  56. df_cp = df_cp.join(nwp[col_radiation])
  57. df_cp.loc[nwp[col_radiation] == 0, 'power_forecast'] = 0
  58. df_cp.drop(columns=[col_radiation], inplace=True)
  59. logger.info("solar processed")
  60. df_cp['power_forecast'] = round(df_cp['power_forecast'], 2)
  61. df_cp.reset_index(inplace=True)
  62. df_cp[col_time] = df_cp[col_time].dt.strftime('%Y-%m-%d %H:%M:%S')
  63. return df_cp
  64. @app.route('/post_process', methods=['POST'])
  65. def data_join():
  66. # 获取程序开始时间
  67. start_time = time.time()
  68. result = {}
  69. success = 0
  70. print("Program starts execution!")
  71. try:
  72. args = request.values.to_dict()
  73. print('args', args)
  74. logger.info(args)
  75. df_pre = get_data(args)
  76. res_df = predict_result_adjustment(df_pre, args)
  77. insert_data_into_mongo(res_df, args)
  78. success = 1
  79. except Exception as e:
  80. my_exception = traceback.format_exc()
  81. logger.error(my_exception)
  82. end_time = time.time()
  83. result['success'] = success
  84. result['args'] = args
  85. result['log'] = g.log_stream.getvalue().splitlines()
  86. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  87. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  88. print("Program execution ends!")
  89. return result
  90. if __name__ == "__main__":
  91. print("Program starts execution!")
  92. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  93. logger = logging.getLogger("post_process")
  94. from waitress import serve
  95. serve(app, host="0.0.0.0", port=10130)
  96. print("server start!")