post_process.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. import pandas as pd
  2. from flask import Flask, request, jsonify
  3. import time
  4. import logging
  5. import traceback
  6. from common.database_dml import get_data_from_mongo, insert_data_into_mongo
  7. app = Flask('post_processing——service')
  8. """
  9. id = "${id}"
  10. cap = ${cap}
  11. 参数
  12. {
  13. 'mongodb_database': 'hzh_ftp',
  14. 'mongodb_read_table': f'{id}_PRED',
  15. 'mongodb_write_table': f'{id}_PRED',
  16. 'col_time': "dateTime",
  17. 'smooth_window': 3
  18. 'plant_type': 'solar',
  19. 'mongodb_nwp_table': f'{id}_NWP_D1'
  20. }
  21. """
  22. def get_data(args):
  23. df = get_data_from_mongo(args)
  24. col_time = args['col_time']
  25. if not df.empty:
  26. print("预测数据加载成功!")
  27. df[col_time] = pd.to_datetime(df[col_time])
  28. df.set_index(col_time, inplace=True)
  29. df.sort_index(inplace=True)
  30. else:
  31. raise ValueError("未获取到预测数据。")
  32. return df
  33. def predict_result_adjustment(df, args):
  34. """
  35. 光伏/风电 数据后处理 主要操作
  36. 1. 光伏 (夜间 置零 + 平滑)
  37. 2. 风电 (平滑)
  38. 3. cap 封顶
  39. """
  40. mongodb_database, plant_type, cap, col_time = args['mongodb_database'], args['plant_type'], float(args['cap']), \
  41. args['col_time']
  42. if 'smooth_window' in args.keys():
  43. smooth_window = int(args['smooth_window'])
  44. else:
  45. smooth_window = 3
  46. # 平滑
  47. df_cp = df.copy()
  48. df_cp['power_forecast'] = df_cp['power_forecast'].rolling(window=smooth_window, min_periods=1,
  49. center=True).mean().clip(0, 0.985 * cap)
  50. print("smooth processed")
  51. # 光伏晚上置零
  52. if plant_type == 'solar' and 'mongodb_nwp_table' in args.keys():
  53. nwp_param = {
  54. 'mongodb_database': mongodb_database,
  55. 'mongodb_read_table': args['mongodb_nwp_table'],
  56. 'col_time': col_time
  57. }
  58. nwp = get_data(nwp_param)
  59. df_cp = df_cp.join(nwp['radiation'])
  60. df_cp.loc[nwp['radiation'] == 0, 'power_forecast'] = 0
  61. df_cp['power_forecast'] = round(df_cp['power_forecast'], 2)
  62. df_cp.drop(columns=['radiation'], inplace=True)
  63. print("solar processed")
  64. df_cp.reset_index(inplace=True)
  65. df_cp[col_time] = df_cp[col_time].dt.strftime('%Y-%m-%d %H:%M:%S')
  66. return df_cp
  67. @app.route('/post_process', methods=['POST'])
  68. def data_join():
  69. # 获取程序开始时间
  70. start_time = time.time()
  71. result = {}
  72. success = 0
  73. print("Program starts execution!")
  74. try:
  75. args = request.values.to_dict()
  76. print('args', args)
  77. logger.info(args)
  78. df_pre = get_data(args)
  79. res_df = predict_result_adjustment(df_pre, args)
  80. insert_data_into_mongo(res_df, args)
  81. success = 1
  82. except Exception as e:
  83. my_exception = traceback.format_exc()
  84. print(my_exception)
  85. my_exception.replace("\n", "\t")
  86. result['msg'] = my_exception
  87. end_time = time.time()
  88. result['success'] = success
  89. result['args'] = args
  90. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  91. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  92. print("Program execution ends!")
  93. return result
  94. if __name__ == "__main__":
  95. print("Program starts execution!")
  96. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  97. logger = logging.getLogger("post_processing")
  98. from waitress import serve
  99. serve(app, host="0.0.0.0", port=10098)
  100. print("server start!")