data_join.py 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869
  1. import pandas as pd
  2. from flask import Flask,request,jsonify
  3. import time
  4. import logging
  5. import traceback
  6. from functools import reduce
  7. from common.database_dml import get_df_list_from_mongo,insert_data_into_mongo
  8. app = Flask('data_join——service')
  9. @app.route('/hello', methods=['POST'])
  10. def hello():
  11. return jsonify(message='Hello, World!')
  12. #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了
  13. def data_merge(df_list, args):
  14. join_key,join_type,features = args['join_key'], args['join_type'], str_to_list(args['col_reserve'])
  15. result = reduce(lambda left, right: pd.merge(left, right, how=join_type, on=join_key), df_list)
  16. if len(features)==0:
  17. return result
  18. else:
  19. return result[features]
  20. @app.route('/data_join', methods=['POST'])
  21. def data_join():
  22. # 获取程序开始时间
  23. start_time = time.time()
  24. result = {}
  25. success = 0
  26. print("Program starts execution!")
  27. try:
  28. args = request.values.to_dict()
  29. print('args',args)
  30. logger.info(args)
  31. df_list = get_df_list_from_mongo(args)
  32. res_df = data_merge(df_list,args)
  33. insert_data_into_mongo(res_df,args)
  34. success = 1
  35. except Exception as e:
  36. my_exception = traceback.format_exc()
  37. my_exception.replace("\n","\t")
  38. result['msg'] = my_exception
  39. end_time = time.time()
  40. result['success'] = success
  41. result['args'] = args
  42. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  43. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  44. print("Program execution ends!")
  45. return result
  46. def str_to_list(arg):
  47. if arg == '':
  48. return []
  49. else:
  50. return arg.split(',')
  51. if __name__=="__main__":
  52. print("Program starts execution!")
  53. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  54. logger = logging.getLogger("data_join")
  55. from waitress import serve
  56. serve(app, host="0.0.0.0", port=10094)
  57. print("server start!")