pre_post_processing.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import pandas as pd
  2. from flask import Flask,request,jsonify
  3. import time
  4. import logging
  5. import traceback
  6. from common.database_dml import get_data_from_mongo,insert_data_into_mongo
  7. app = Flask('post_processing——service')
  8. @app.route('/hello', methods=['POST'])
  9. def hello():
  10. return jsonify(message='Hello, World!')
  11. def predict_result_adjustment(df, args):
  12. col_time = args['col_time']
  13. col_adjust = args['col_adjust']
  14. ratio_dict = eval(args['ratio'])
  15. df['hour'] = df[col_time].apply(lambda x:x[11:13])
  16. rates = ratio_dict.keys()
  17. for rate in rates:
  18. hours = ratio_dict[rate]
  19. df.loc[df['hour'].isin(hours), col_adjust] *= float(rate)
  20. return df.drop('hour', axis=1)
  21. @app.route('/pre_post_processing', methods=['POST'])
  22. def data_join():
  23. # 获取程序开始时间
  24. start_time = time.time()
  25. result = {}
  26. success = 0
  27. print("Program starts execution!")
  28. try:
  29. args = request.values.to_dict()
  30. print('args',args)
  31. logger.info(args)
  32. df_pre = get_data_from_mongo(args)
  33. res_df = predict_result_adjustment(df_pre, args)
  34. insert_data_into_mongo(res_df,args)
  35. success = 1
  36. except Exception as e:
  37. my_exception = traceback.format_exc()
  38. my_exception.replace("\n","\t")
  39. result['msg'] = my_exception
  40. end_time = time.time()
  41. result['success'] = success
  42. result['args'] = args
  43. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  44. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  45. print("Program execution ends!")
  46. return result
  47. if __name__=="__main__":
  48. print("Program starts execution!")
  49. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  50. logger = logging.getLogger("post_processing")
  51. from waitress import serve
  52. serve(app, host="0.0.0.0", port=10107)
  53. print("server start!")