data_join.py 3.3 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. import pandas as pd
  2. from pymongo import MongoClient
  3. from sqlalchemy import create_engine
  4. from flask import Flask,request,jsonify
  5. import time
  6. import logging
  7. import traceback
  8. from functools import reduce
  9. app = Flask('data_join——service')
  10. @app.route('/hello', methods=['POST'])
  11. def hello():
  12. return jsonify(message='Hello, World!')
  13. def get_data_from_mongo(args):
  14. mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'].split(',')
  15. df_list = []
  16. client = MongoClient(mongodb_connection)
  17. # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
  18. db = client[mongodb_database]
  19. for table in mongodb_read_table:
  20. collection = db[table] # 集合名称
  21. data_from_db = collection.find() # 这会返回一个游标(cursor)
  22. # 将游标转换为列表,并创建 pandas DataFrame
  23. df = pd.DataFrame(list(data_from_db))
  24. df_list.append(df)
  25. client.close()
  26. return df_list
  27. def insert_data_into_mongo(res_df,args):
  28. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  29. client = MongoClient(mongodb_connection)
  30. db = client[mongodb_database]
  31. if mongodb_write_table in db.list_collection_names():
  32. db[mongodb_write_table].drop()
  33. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  34. collection = db[mongodb_write_table] # 集合名称
  35. # 将 DataFrame 转为字典格式
  36. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  37. # 插入到 MongoDB
  38. collection.insert_many(data_dict)
  39. print("data inserted successfully!")
  40. #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了
  41. def data_join(df_list, args):
  42. join_key,join_type = args['join_key'], args['join_type']
  43. result = reduce(lambda left, right: pd.merge(left, right, how='join_type', on=join_key), df_list)
  44. return result
  45. @app.route('/data_join', methods=['POST'])
  46. def data_join():
  47. # 获取程序开始时间
  48. start_time = time.time()
  49. result = {}
  50. success = 0
  51. print("Program starts execution!")
  52. try:
  53. args = request.values.to_dict()
  54. print('args',args)
  55. logger.info(args)
  56. df_list = get_data_from_mongo(args)
  57. res_df = data_join(df_list,args)
  58. insert_data_into_mongo(res_df,args)
  59. success = 1
  60. except Exception as e:
  61. my_exception = traceback.format_exc()
  62. my_exception.replace("\n","\t")
  63. result['msg'] = my_exception
  64. end_time = time.time()
  65. result['success'] = success
  66. result['args'] = args
  67. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  68. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  69. print("Program execution ends!")
  70. return result
  71. if __name__=="__main__":
  72. print("Program starts execution!")
  73. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  74. logger = logging.getLogger("data_join")
  75. from waitress import serve
  76. serve(app, host="0.0.0.0", port=10094)
  77. print("server start!")