import argparse import pandas as pd from pymongo import MongoClient from sqlalchemy import create_engine from flask import Flask,request,jsonify from waitress import serve import time import logging import traceback app = Flask('processing_limit_power_by_agcavc——service') @app.route('/hello', methods=['POST','GET']) def hello(): return jsonify(message='Hello, World!') def get_data_from_mongo(args): mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'] client = MongoClient(mongodb_connection) # 选择数据库(如果数据库不存在,MongoDB 会自动创建) db = client[mongodb_database] collection = db[mongodb_read_table] # 集合名称 data_from_db = collection.find() # 这会返回一个游标(cursor) # 将游标转换为列表,并创建 pandas DataFrame df = pd.DataFrame(list(data_from_db)) client.close() return df def insert_data_into_mongo(res_df,args): mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table'] client = MongoClient(mongodb_connection) db = client[mongodb_database] if mongodb_write_table in db.list_collection_names(): db[mongodb_write_table].drop() print(f"Collection '{mongodb_write_table} already exist, deleted successfully!") collection = db[mongodb_write_table] # 集合名称 # 将 DataFrame 转为字典格式 data_dict = res_df.to_dict("records") # 每一行作为一个字典 # 插入到 MongoDB collection.insert_many(data_dict) print("data inserted successfully!") #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了 def agc_avc_judgement(power_df,args): timeBegin,timeEnd,col_time,mysql_connection,avc_table = args['timeBegin'], args['timeEnd'],args['col_time'],args['mysql_connection'],args['agc_avc_table'] #限电记录 clean_record = [] # 创建连接 # cnx = mysql.connector.connect(user=user,password=password,host=host,port=port,database=database) # # 创建一个游标对象 # cursor = cnx.cursor() engine = create_engine(mysql_connection) # 定义SQL查询 df = pd.read_sql_query(f"select C_TIME AS {col_time}, 1 as agc_avc_limit from {avc_table} where C_TIME>='{timeBegin} 00:00:00' and C_TIME<='{timeEnd} 23:59:59' and (C_IS_RATIONING_BY_AUTO_CONTROL is True or C_IS_RATIONING_BY_MANUAL_CONTROL=1)", engine) df[col_time] = pd.to_datetime(df[col_time]).dt.strftime('%Y-%m-%d %H:%M:%S') if df.shape[0]>0: print(f"根据限电记录清洗,{timeBegin}至{timeEnd},限电时长共计{round(df.shape[0]/60,2)}小时") clean_record = df[col_time].tolist() # 关闭游标和连接 # cursor.close() # cnx.close() print(power_df.columns) power_df = pd.merge(power_df,df,on=col_time,how='left') return power_df[power_df['agc_avc_limit']!=1].drop('agc_avc_limit',axis=1) @app.route('/processing_limit_power_by_agcavc', methods=['POST','GET']) def processing_limit_power_by_agcavc(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) power_df = get_data_from_mongo(args) res_df = agc_avc_judgement(power_df,args) insert_data_into_mongo(res_df,args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("processing_limit_power_by_agcavc") from waitress import serve serve(app, host="0.0.0.0", port=10086) print("server start!")