12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- import argparse
- import pandas as pd
- import numpy as np
- from pymongo import MongoClient
- import matplotlib.pyplot as plt
- from flask import Flask,request,jsonify
- from waitress import serve
- import time
- import logging
- import traceback
- app = Flask('processing_limit_power_by_machines——service')
- def get_data_from_mongo(args):
- mongodb_connection,mongodb_database,mongodb_read_table = args['mongodb_connection'],args['mongodb_database'],args['mongodb_read_table']
- client = MongoClient(mongodb_connection)
- # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
- db = client[mongodb_database]
- collection = db[mongodb_read_table] # 集合名称
- data_from_db = collection.find() # 这会返回一个游标(cursor)
- # 将游标转换为列表,并创建 pandas DataFrame
- df = pd.DataFrame(list(data_from_db))
- client.close()
- return df
-
- def insert_data_into_mongo(res_df,args):
- mongodb_connection,mongodb_database,mongodb_write_table = args['mongodb_connection'],args['mongodb_database'],args['mongodb_write_table']
- client = MongoClient(mongodb_connection)
- db = client[mongodb_database]
- if mongodb_write_table in db.list_collection_names():
- db[mongodb_write_table].drop()
- print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
- collection = db[mongodb_write_table] # 集合名称
- # 将 DataFrame 转为字典格式
- data_dict = res_df.to_dict("records") # 每一行作为一个字典
- # 插入到 MongoDB
- collection.insert_many(data_dict)
- print("data inserted successfully!")
-
- #样板机法
- def windLight_machine_judgment(power,args):
- col_rp,col_tp = args['col_rp'],args['col_tp']
- power['diff'] = power[col_tp]-power[col_rp]
- power['is_limit_machine'] = False
- diff_mean,diff_std = power['diff'].mean(),power['diff'].std()
- threshold = diff_mean-1.96*(diff_std/np.sqrt(power.shape[0]))
- print("threshold",threshold)
- power.loc[abs(power['diff']) > threshold ,'is_limit_machine'] = True
- is_limit = power['is_limit_machine']
- # 可视化结果
- plt.figure(figsize=(300, 6))
- plt.plot(power[col_rp], label=col_rp)
- plt.plot(power[col_tp], label=col_tp)
- plt.scatter(power.index[is_limit], power[col_rp][is_limit], color='red', label='限电点')
- plt.legend()
- plt.title(f' 95%置信水平计算置信区间 ---限电 threshold={threshold}')
- plt.show()
- return power[power['is_limit_machine']!=True].drop(['diff','is_limit_machine'],axis=1)
- @app.route('/processing_limit_power_by_machines', methods=['POST'])
- def processing_limit_power_by_machines():
- # 获取程序开始时间
- start_time = time.time()
- result = {}
- success = 0
- print("Program starts execution!")
- try:
- args = request.values.to_dict()
- print("*********",args,"*******")
- power_df = get_data_from_mongo(args)
- res_df = windLight_machine_judgment(power_df,args)
- insert_data_into_mongo(res_df,args)
- success = 1
- except Exception as e:
- my_exception = traceback.format_exc()
- my_exception.replace("\n","\t")
- result['msg'] = my_exception
- end_time = time.time()
-
- result['success'] = success
- result['args'] = args
- result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
- result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
- print("Program execution ends!")
- return result
-
- if __name__=="__main__":
- print("Program starts execution!")
- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
- logger = logging.getLogger("processing_limit_power_by_machines log")
- from waitress import serve
- serve(app, host="0.0.0.0", port=10087)
- print("server start!")
|