data_join.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. import pandas as pd
  2. from pymongo import MongoClient
  3. from flask import Flask,request,jsonify
  4. import time
  5. import logging
  6. import traceback
  7. from functools import reduce
  8. app = Flask('data_join——service')
  9. @app.route('/hello', methods=['POST'])
  10. def hello():
  11. return jsonify(message='Hello, World!')
  12. def get_data_from_mongo(args):
  13. mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'].split(',')
  14. df_list = []
  15. client = MongoClient(mongodb_connection)
  16. # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
  17. db = client[mongodb_database]
  18. for table in mongodb_read_table:
  19. collection = db[table] # 集合名称
  20. data_from_db = collection.find() # 这会返回一个游标(cursor)
  21. # 将游标转换为列表,并创建 pandas DataFrame
  22. df = pd.DataFrame(list(data_from_db))
  23. df_list.append(df)
  24. client.close()
  25. return df_list
  26. def insert_data_into_mongo(res_df,args):
  27. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  28. client = MongoClient(mongodb_connection)
  29. db = client[mongodb_database]
  30. if mongodb_write_table in db.list_collection_names():
  31. db[mongodb_write_table].drop()
  32. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  33. collection = db[mongodb_write_table] # 集合名称
  34. # 将 DataFrame 转为字典格式
  35. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  36. # 插入到 MongoDB
  37. collection.insert_many(data_dict)
  38. print("data inserted successfully!")
  39. #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了
  40. def data_merge(df_list, args):
  41. join_key,join_type = args['join_key'], args['join_type']
  42. result = reduce(lambda left, right: pd.merge(left, right, how=join_type, on=join_key), df_list)
  43. return result
  44. @app.route('/data_join', methods=['POST'])
  45. def data_join():
  46. # 获取程序开始时间
  47. start_time = time.time()
  48. result = {}
  49. success = 0
  50. print("Program starts execution!")
  51. try:
  52. args = request.values.to_dict()
  53. print('args',args)
  54. logger.info(args)
  55. df_list = get_data_from_mongo(args)
  56. res_df = data_merge(df_list,args)
  57. insert_data_into_mongo(res_df,args)
  58. success = 1
  59. except Exception as e:
  60. my_exception = traceback.format_exc()
  61. my_exception.replace("\n","\t")
  62. result['msg'] = my_exception
  63. end_time = time.time()
  64. result['success'] = success
  65. result['args'] = args
  66. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  67. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  68. print("Program execution ends!")
  69. return result
  70. if __name__=="__main__":
  71. print("Program starts execution!")
  72. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  73. logger = logging.getLogger("data_join")
  74. from waitress import serve
  75. serve(app, host="0.0.0.0", port=10094)
  76. print("server start!")