12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091 |
- import pandas as pd
- from pymongo import MongoClient
- from flask import Flask,request,jsonify
- import time
- import logging
- import traceback
- from functools import reduce
- app = Flask('data_join——service')
- @app.route('/hello', methods=['POST'])
- def hello():
- return jsonify(message='Hello, World!')
- def get_data_from_mongo(args):
- mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'].split(',')
- df_list = []
- client = MongoClient(mongodb_connection)
- # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
- db = client[mongodb_database]
- for table in mongodb_read_table:
- collection = db[table] # 集合名称
- data_from_db = collection.find() # 这会返回一个游标(cursor)
- # 将游标转换为列表,并创建 pandas DataFrame
- df = pd.DataFrame(list(data_from_db))
- df_list.append(df)
- client.close()
- return df_list
- def insert_data_into_mongo(res_df,args):
- mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
- client = MongoClient(mongodb_connection)
- db = client[mongodb_database]
- if mongodb_write_table in db.list_collection_names():
- db[mongodb_write_table].drop()
- print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
- collection = db[mongodb_write_table] # 集合名称
- # 将 DataFrame 转为字典格式
- data_dict = res_df.to_dict("records") # 每一行作为一个字典
- # 插入到 MongoDB
- collection.insert_many(data_dict)
- print("data inserted successfully!")
- #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了
- def data_merge(df_list, args):
- join_key,join_type = args['join_key'], args['join_type']
- result = reduce(lambda left, right: pd.merge(left, right, how=join_type, on=join_key), df_list)
- return result
- @app.route('/data_join', methods=['POST'])
- def data_join():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- print('args',args)
- logger.info(args)
- df_list = get_data_from_mongo(args)
- res_df = data_merge(df_list,args)
- insert_data_into_mongo(res_df,args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n","\t")
- result['msg'] = my_exception
- end_time = time.time()
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
- if __name__=="__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("data_join")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10094)
- print("server start!")
-
-
-
|