post_processing.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import pandas as pd
  2. from flask import Flask,request,jsonify
  3. import time
  4. import logging
  5. import traceback
  6. from common.database_dml import get_data_from_mongo,insert_data_into_mongo
  7. app = Flask('post_processing——service')
  8. @app.route('/hello', methods=['POST'])
  9. def hello():
  10. return jsonify(message='Hello, World!')
  11. #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了
  12. def predict_result_adjustment(df, args):
  13. params = eval(args['params'])
  14. df_res = pd.DataFrame()
  15. model_names = params['new_names'].keys()
  16. for model in params['adjustments']:
  17. if model in model_names:
  18. new_name = params['new_names'][model]
  19. else:
  20. new_name = model
  21. rates = params['adjustments'][model].keys()
  22. df_tmp = df[df['model']==model]
  23. df_tmp['hour'] = df_tmp['dateTime'].apply(lambda x:x[11:13])
  24. df_tmp['model'] = new_name
  25. for rate in rates:
  26. hours = params['adjustments'][model][rate]
  27. # 提取小时部分(格式化为两位字符串
  28. df_tmp.loc[df_tmp['hour'].isin(hours), 'predict'] *= float(rate)
  29. # 删除临时的 hour 列
  30. df_res = pd.concat([df_res,df_tmp],ignore_index=True)
  31. return df_res.drop('hour', axis=1)
  32. @app.route('/post_processing', methods=['POST'])
  33. def data_join():
  34. # 获取程序开始时间
  35. start_time = time.time()
  36. result = {}
  37. success = 0
  38. print("Program starts execution!")
  39. try:
  40. args = request.values.to_dict()
  41. print('args',args)
  42. logger.info(args)
  43. df_pre = get_data_from_mongo(args)
  44. res_df = predict_result_adjustment(df_pre, args)
  45. insert_data_into_mongo(res_df,args)
  46. success = 1
  47. except Exception as e:
  48. my_exception = traceback.format_exc()
  49. my_exception.replace("\n","\t")
  50. result['msg'] = my_exception
  51. end_time = time.time()
  52. result['success'] = success
  53. result['args'] = args
  54. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  55. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  56. print("Program execution ends!")
  57. return result
  58. if __name__=="__main__":
  59. print("Program starts execution!")
  60. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  61. logger = logging.getLogger("post_processing")
  62. from waitress import serve
  63. serve(app, host="0.0.0.0", port=10098)
  64. print("server start!")