data_join.py 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566
  1. import pandas as pd
  2. from flask import Flask,request,jsonify
  3. import time
  4. import logging
  5. import traceback
  6. from functools import reduce
  7. from common.database_dml import get_df_list_from_mongo,insert_data_into_mongo
  8. app = Flask('data_join——service')
  9. @app.route('/hello', methods=['POST'])
  10. def hello():
  11. return jsonify(message='Hello, World!')
  12. #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了
  13. def data_merge(df_list, args):
  14. join_key,join_type,features = args['join_key'], args['join_type'], str_to_list(args['col_reserve'])
  15. result = reduce(lambda left, right: pd.merge(left, right, how=join_type, on=join_key), df_list)
  16. return result[features]
  17. @app.route('/data_join', methods=['POST'])
  18. def data_join():
  19. # 获取程序开始时间
  20. start_time = time.time()
  21. result = {}
  22. success = 0
  23. print("Program starts execution!")
  24. try:
  25. args = request.values.to_dict()
  26. print('args',args)
  27. logger.info(args)
  28. df_list = get_df_list_from_mongo(args)
  29. res_df = data_merge(df_list,args)
  30. insert_data_into_mongo(res_df,args)
  31. success = 1
  32. except Exception as e:
  33. my_exception = traceback.format_exc()
  34. my_exception.replace("\n","\t")
  35. result['msg'] = my_exception
  36. end_time = time.time()
  37. result['success'] = success
  38. result['args'] = args
  39. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  40. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  41. print("Program execution ends!")
  42. return result
  43. def str_to_list(arg):
  44. if arg == '':
  45. return []
  46. else:
  47. return arg.split(',')
  48. if __name__=="__main__":
  49. print("Program starts execution!")
  50. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  51. logger = logging.getLogger("data_join")
  52. from waitress import serve
  53. serve(app, host="0.0.0.0", port=10094)
  54. print("server start!")