import argparse import pandas as pd from pymongo import MongoClient from sqlalchemy import create_engine from flask import Flask,request,jsonify from waitress import serve import time import logging import traceback from sklearn.linear_model import LinearRegression import numpy as np from bson.decimal128 import Decimal128 app = Flask('processing_limit_power_by_statistics_light——service') @app.route('/hello', methods=['GET']) def hello(): return jsonify(message='Hello, World!') def get_data_from_mongo(args): mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'] client = MongoClient(mongodb_connection) # 选择数据库(如果数据库不存在,MongoDB 会自动创建) db = client[mongodb_database] collection = db[mongodb_read_table] # 集合名称 data_from_db = collection.find() # 这会返回一个游标(cursor) # 将游标转换为列表,并创建 pandas DataFrame df = pd.DataFrame(list(data_from_db)) client.close() return df def insert_data_into_mongo(res_df,args): mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table'] client = MongoClient(mongodb_connection) db = client[mongodb_database] if mongodb_write_table in db.list_collection_names(): db[mongodb_write_table].drop() print(f"Collection '{mongodb_write_table} already exist, deleted successfully!") collection = db[mongodb_write_table] # 集合名称 # 将 DataFrame 转为字典格式 data_dict = res_df.to_dict("records") # 每一行作为一个字典 # 插入到 MongoDB collection.insert_many(data_dict) print("data inserted successfully!") def light_statistics_judgement(df_power,args): """ 原理:基于实测辐照度与实际功率相关性强正相关,呈严格线性关系为假设前提, 假设误差大致呈现标准正态分布 mean + N*std """ col_radiance, col_power, sigma=args['col_radiance'],args['col_power'],float(args['sigma']) origin_records = df_power.shape[0] # 提取辐射度和实际功率 df_power[col_radiance] = df_power[col_radiance].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else np.nan) df_power[col_power] = df_power[col_power].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else np.nan) df_power = df_power[(~pd.isna(df_power[col_radiance]))&(~pd.isna(df_power[col_power]))&(~((df_power[col_radiance]<=0)&(df_power[col_power]>0)))] X = df_power[[col_radiance]].values y = df_power[col_power].values print(X) # 创建线性回归模型并拟合 model = LinearRegression() model.fit(X, y) # 获取斜率和偏移量 k_final = model.coef_[0] b = model.intercept_ # 计算预测的实际功率范围 predicted_power = model.predict(X) margin = np.mean(abs(y - predicted_power))+ sigma * np.std(abs(y - predicted_power)) print("margin:",margin) # 过滤数据 def filter_unlimited_power(zfs, real_power): high = min(k_final * zfs + b + margin, 100) low = max(k_final * zfs + b - margin, 0) return low <= real_power <= high # 应用过滤并标记数据 df_power['c'] = df_power.apply(lambda row: 'green' if filter_unlimited_power(row[col_radiance], row[col_power]) else 'red', axis=1) df_power['is_limit'] = df_power['c'].apply(lambda x: False if x=='green' else True) # df_power.plot.scatter(x=col_radiance, y=col_power, c='c') #过滤掉限电点 new_df_power = df_power[df_power['is_limit'] == False] print(f"未清洗限电前,总共有:{origin_records}条数据") print(f"清除异常点后保留的点有:{len(new_df_power)}, 占比:{round(len(new_df_power) / origin_records, 2)}") return df_power[df_power['is_limit'] == False].drop(['is_limit','c'],axis=1) @app.route('/processing_limit_power_by_statistics_light', methods=['POST','GET']) def processing_limit_power_by_statistics_light(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) power_df = get_data_from_mongo(args) res_df = light_statistics_judgement(power_df,args) insert_data_into_mongo(res_df,args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("统计法清洗光伏场站限电") from waitress import serve serve(app, host="0.0.0.0", port=10085) print("server start!")