processing_limit_power_by_statistics_light.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. import argparse
  2. import pandas as pd
  3. from pymongo import MongoClient
  4. from sqlalchemy import create_engine
  5. from flask import Flask,request,jsonify
  6. from waitress import serve
  7. import time
  8. import logging
  9. import traceback
  10. from sklearn.linear_model import LinearRegression
  11. import numpy as np
  12. from bson.decimal128 import Decimal128
  13. app = Flask('processing_limit_power_by_statistics_light——service')
  14. @app.route('/hello', methods=['GET'])
  15. def hello():
  16. return jsonify(message='Hello, World!')
  17. def get_data_from_mongo(args):
  18. mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
  19. client = MongoClient(mongodb_connection)
  20. # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
  21. db = client[mongodb_database]
  22. collection = db[mongodb_read_table] # 集合名称
  23. data_from_db = collection.find() # 这会返回一个游标(cursor)
  24. # 将游标转换为列表,并创建 pandas DataFrame
  25. df = pd.DataFrame(list(data_from_db))
  26. client.close()
  27. return df
  28. def insert_data_into_mongo(res_df,args):
  29. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  30. client = MongoClient(mongodb_connection)
  31. db = client[mongodb_database]
  32. if mongodb_write_table in db.list_collection_names():
  33. db[mongodb_write_table].drop()
  34. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  35. collection = db[mongodb_write_table] # 集合名称
  36. # 将 DataFrame 转为字典格式
  37. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  38. # 插入到 MongoDB
  39. collection.insert_many(data_dict)
  40. print("data inserted successfully!")
  41. def light_statistics_judgement(df_power,args):
  42. """
  43. 原理:基于实测辐照度与实际功率相关性强正相关,呈严格线性关系为假设前提,
  44. 假设误差大致呈现标准正态分布 mean + N*std
  45. """
  46. col_radiance, col_power, sigma=args['col_radiance'],args['col_power'],float(args['sigma'])
  47. origin_records = df_power.shape[0]
  48. # 提取辐射度和实际功率
  49. df_power[col_radiance] = df_power[col_radiance].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else np.nan)
  50. df_power[col_power] = df_power[col_power].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else np.nan)
  51. df_power = df_power[(~pd.isna(df_power[col_radiance]))&(~pd.isna(df_power[col_power]))&(~((df_power[col_radiance]<=0)&(df_power[col_power]>0)))]
  52. X = df_power[[col_radiance]].values
  53. y = df_power[col_power].values
  54. print(X)
  55. # 创建线性回归模型并拟合
  56. model = LinearRegression()
  57. model.fit(X, y)
  58. # 获取斜率和偏移量
  59. k_final = model.coef_[0]
  60. b = model.intercept_
  61. # 计算预测的实际功率范围
  62. predicted_power = model.predict(X)
  63. margin = np.mean(abs(y - predicted_power))+ sigma * np.std(abs(y - predicted_power))
  64. print("margin:",margin)
  65. # 过滤数据
  66. def filter_unlimited_power(zfs, real_power):
  67. high = min(k_final * zfs + b + margin, 100)
  68. low = max(k_final * zfs + b - margin, 0)
  69. return low <= real_power <= high
  70. # 应用过滤并标记数据
  71. df_power['c'] = df_power.apply(lambda row: 'green' if filter_unlimited_power(row[col_radiance], row[col_power]) else 'red', axis=1)
  72. df_power['is_limit'] = df_power['c'].apply(lambda x: False if x=='green' else True)
  73. # df_power.plot.scatter(x=col_radiance, y=col_power, c='c')
  74. #过滤掉限电点
  75. new_df_power = df_power[df_power['is_limit'] == False]
  76. print(f"未清洗限电前,总共有:{origin_records}条数据")
  77. print(f"清除异常点后保留的点有:{len(new_df_power)}, 占比:{round(len(new_df_power) / origin_records, 2)}")
  78. return df_power[df_power['is_limit'] == False].drop(['is_limit','c'],axis=1)
  79. @app.route('/processing_limit_power_by_statistics_light', methods=['POST','GET'])
  80. def processing_limit_power_by_statistics_light():
  81. # 获取程序开始时间
  82. start_time = time.time()
  83. result = {}
  84. success = 0
  85. print("Program starts execution!")
  86. try:
  87. args = request.values.to_dict()
  88. print('args',args)
  89. logger.info(args)
  90. power_df = get_data_from_mongo(args)
  91. res_df = light_statistics_judgement(power_df,args)
  92. insert_data_into_mongo(res_df,args)
  93. success = 1
  94. except Exception as e:
  95. my_exception = traceback.format_exc()
  96. my_exception.replace("\n","\t")
  97. result['msg'] = my_exception
  98. end_time = time.time()
  99. result['success'] = success
  100. result['args'] = args
  101. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  102. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  103. print("Program execution ends!")
  104. return result
  105. if __name__=="__main__":
  106. print("Program starts execution!")
  107. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  108. logger = logging.getLogger("统计法清洗光伏场站限电")
  109. from waitress import serve
  110. serve(app, host="0.0.0.0", port=10085)
  111. print("server start!")