Bladeren bron

awg commit algorithm components

anweiguo 4 maanden geleden
bovenliggende
commit
e30d503239

+ 214 - 0
common/database_dml.py

@@ -0,0 +1,214 @@
+from pymongo import MongoClient, UpdateOne
+import pandas as pd
+from sqlalchemy import create_engine
+import pickle
+from io import BytesIO
+import joblib
+import h5py
+import tensorflow as tf
+
+def get_data_from_mongo(args):
+    mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
+    mongodb_database = args['mongodb_database']
+    mongodb_read_table = args['mongodb_read_table']
+    query_dict = {}
+    if 'timeBegin' in args.keys():
+        timeBegin = args['timeBegin']
+        query_dict.update({"$gte": timeBegin})
+    if 'timeEnd' in args.keys():
+        timeEnd = args['timeEnd']
+        query_dict.update({"$lte": timeEnd})
+
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    collection = db[mongodb_read_table]  # 集合名称
+    if len(query_dict) != 0:
+        query = {"dateTime": query_dict}
+        cursor = collection.find(query)
+    else:
+        cursor = collection.find()
+    data = list(cursor)
+    df = pd.DataFrame(data)
+    # 4. 删除 _id 字段(可选)
+    if '_id' in df.columns:
+        df = df.drop(columns=['_id'])
+    client.close()
+    return df
+
+
+def get_df_list_from_mongo(args):
+    mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'].split(',')
+    df_list = []
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    for table in mongodb_read_table:
+        collection = db[table]  # 集合名称
+        data_from_db = collection.find()  # 这会返回一个游标(cursor)
+        # 将游标转换为列表,并创建 pandas DataFrame
+        df = pd.DataFrame(list(data_from_db))
+        if '_id' in df.columns:
+            df = df.drop(columns=['_id'])
+        df_list.append(df)
+    client.close()
+    return df_list
+
+def insert_data_into_mongo(res_df, args):
+    """
+    插入数据到 MongoDB 集合中,可以选择覆盖、追加或按指定的 key 进行更新插入。
+
+    参数:
+    - res_df: 要插入的 DataFrame 数据
+    - args: 包含 MongoDB 数据库和集合名称的字典
+    - overwrite: 布尔值,True 表示覆盖,False 表示追加
+    - update_keys: 列表,指定用于匹配的 key 列,如果存在则更新,否则插入 'col1','col2'
+    """
+    mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
+    mongodb_database = args['mongodb_database']
+    mongodb_write_table = args['mongodb_write_table']
+    overwrite = 1
+    update_keys = None
+    if 'overwrite' in args.keys():
+        overwrite = int(args['overwrite'])
+    if 'update_keys' in args.keys():
+        update_keys = args['update_keys'].split(',')
+
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    collection = db[mongodb_write_table]
+
+    # 覆盖模式:删除现有集合
+    if overwrite:
+        if mongodb_write_table in db.list_collection_names():
+            collection.drop()
+            print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!")
+
+    # 将 DataFrame 转为字典格式
+    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
+
+    # 如果没有数据,直接返回
+    if not data_dict:
+        print("No data to insert.")
+        return
+
+    # 如果指定了 update_keys,则执行 upsert(更新或插入)
+    if update_keys and not overwrite:
+        operations = []
+        for record in data_dict:
+            # 构建查询条件,用于匹配要更新的文档
+            query = {key: record[key] for key in update_keys}
+            operations.append(UpdateOne(query, {'$set': record}, upsert=True))
+
+        # 批量执行更新/插入操作
+        if operations:
+            result = collection.bulk_write(operations)
+            print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}")
+    else:
+        # 追加模式:直接插入新数据
+        collection.insert_many(data_dict)
+        print("Data inserted successfully!")
+
+
+def get_data_fromMysql(params):
+    mysql_conn = params['mysql_conn']
+    query_sql = params['query_sql']
+    #数据库读取实测气象
+    engine = create_engine(f"mysql+pymysql://{mysql_conn}")
+    # 定义SQL查询
+    env_df = pd.read_sql_query(query_sql, engine)
+    return env_df
+
+
+def insert_pickle_model_into_mongo(model, args):
+    mongodb_connection, mongodb_database, mongodb_write_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \
+    args['mongodb_database'], args['mongodb_write_table'], args['model_name']
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    # 序列化模型
+    model_bytes = pickle.dumps(model)
+    model_data = {
+        'model_name': model_name,
+        'model': model_bytes,  # 将模型字节流存入数据库
+    }
+    print('Training completed!')
+
+    if mongodb_write_table in db.list_collection_names():
+        db[mongodb_write_table].drop()
+        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
+    collection = db[mongodb_write_table]  # 集合名称
+    collection.insert_one(model_data)
+    print("model inserted successfully!")
+
+
+def insert_h5_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args):
+    mongodb_connection,mongodb_database,scaler_table,model_table,model_name = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
+                                args['mongodb_database'],args['scaler_table'],args['model_table'],args['model_name'])
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    collection = db[scaler_table]  # 集合名称
+    # Save the scalers in MongoDB as binary data
+    collection.insert_one({
+        "feature_scaler": feature_scaler_bytes.read(),
+        "target_scaler": target_scaler_bytes.read()
+    })
+    print("model inserted successfully!")
+    model_table = db[model_table]
+    # 创建 BytesIO 缓冲区
+    model_buffer = BytesIO()
+    # 将模型保存为 HDF5 格式到内存 (BytesIO)
+    model.save(model_buffer, save_format='h5')
+    # 将指针移到缓冲区的起始位置
+    model_buffer.seek(0)
+    # 获取模型的二进制数据
+    model_data = model_buffer.read()
+    # 将模型保存到 MongoDB
+    model_table.insert_one({
+        "model_name": model_name,
+        "model_data": model_data
+    })
+    print("模型成功保存到 MongoDB!")
+
+
+def get_h5_model_from_mongo(args):
+    mongodb_connection,mongodb_database,model_table,model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['model_table'],args['model_name']
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    collection = db[model_table]  # 集合名称
+
+     # 查询 MongoDB 获取模型数据
+    model_doc = collection.find_one({"model_name": model_name})
+    if model_doc:
+        model_data = model_doc['model_data']  # 获取模型的二进制数据
+        # 将二进制数据加载到 BytesIO 缓冲区
+        model_buffer = BytesIO(model_data)
+        # 从缓冲区加载模型
+         # 使用 h5py 和 BytesIO 从内存中加载模型
+        with h5py.File(model_buffer, 'r') as f:
+            model = tf.keras.models.load_model(f)
+        print(f"{model_name}模型成功从 MongoDB 加载!")
+        client.close()
+        return model
+    else:
+        print(f"未找到model_name为 {model_name} 的模型。")
+        client.close()
+        return None
+
+
+def get_scaler_model_from_mongo(args):
+    mongodb_connection, mongodb_database, scaler_table, = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
+                                                           args['mongodb_database'], args['scaler_table'])
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    collection = db[scaler_table]  # 集合名称
+    # Retrieve the scalers from MongoDB
+    scaler_doc = collection.find_one()
+    # Deserialize the scalers
+
+    feature_scaler_bytes = BytesIO(scaler_doc["feature_scaler"])
+    feature_scaler = joblib.load(feature_scaler_bytes)
+    target_scaler_bytes = BytesIO(scaler_doc["target_scaler"])
+    target_scaler = joblib.load(target_scaler_bytes)
+    return feature_scaler,target_scaler

+ 2 - 32
data_processing/data_operation/data_join.py

@@ -1,10 +1,10 @@
 import pandas as pd
-from pymongo import MongoClient
 from flask import Flask,request,jsonify
 import time
 import logging
 import traceback
 from functools import reduce
+from common.database_dml import get_df_list_from_mongo,insert_data_into_mongo
 
 app = Flask('data_join——service')
 
@@ -13,36 +13,6 @@ app = Flask('data_join——service')
 def hello():
     return jsonify(message='Hello, World!')
 
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'].split(',')
-    df_list = []
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    for table in mongodb_read_table:
-        collection = db[table]  # 集合名称
-        data_from_db = collection.find()  # 这会返回一个游标(cursor)
-        # 将游标转换为列表,并创建 pandas DataFrame
-        df = pd.DataFrame(list(data_from_db))
-        df_list.append(df)
-    client.close()
-    return df_list
-
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
-
 
 #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法  数据库数据有问题 暂时用不了
 def  data_merge(df_list, args):
@@ -62,7 +32,7 @@ def data_join():
         args = request.values.to_dict()
         print('args',args)
         logger.info(args)
-        df_list = get_data_from_mongo(args)
+        df_list = get_df_list_from_mongo(args)
         res_df = data_merge(df_list,args)
         insert_data_into_mongo(res_df,args)
         success = 1

+ 1 - 28
data_processing/data_operation/mysql_to_mongo.py

@@ -1,11 +1,8 @@
-import pandas as pd
-from pymongo import MongoClient
-from sqlalchemy import create_engine
 from flask import Flask,request,jsonify
 import time
 import logging
 import traceback
-
+from common.database_dml import insert_data_into_mongo,get_data_fromMysql
 app = Flask('mysql_to_mongo——service')
 
 
@@ -13,30 +10,6 @@ app = Flask('mysql_to_mongo——service')
 def hello():
     return jsonify(message='Hello, World!')
 
-def get_data_fromMysql(params):
-    mysql_conn = params['mysql_conn']
-    query_sql = params['query_sql']
-    #数据库读取实测气象
-    engine = create_engine(f"mysql+pymysql://{mysql_conn}")
-    # 定义SQL查询
-    env_df = pd.read_sql_query(query_sql, engine)
-    return env_df
-
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
-
 
 @app.route('/mysql_to_mongo', methods=['POST'])
 def data_join():

+ 1 - 28
data_processing/processing_limit_power/processing_limit_power_by_agcavc.py

@@ -5,6 +5,7 @@ from flask import Flask,request,jsonify
 import time
 import logging
 import traceback
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo
 app = Flask('processing_limit_power_by_agcavc——service')
 
 
@@ -13,34 +14,6 @@ def hello():
     return jsonify(message='Hello, World!')
 
 
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    data_from_db = collection.find()  # 这会返回一个游标(cursor)
-    # 将游标转换为列表,并创建 pandas DataFrame
-    df = pd.DataFrame(list(data_from_db))
-    client.close()
-    return df
-
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
-
-
 #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法  数据库数据有问题 暂时用不了
 def  agc_avc_judgement(power_df,args):
     timeBegin,timeEnd,col_time,mysql_connection,avc_table = args['timeBegin'], args['timeEnd'],args['col_time'],args['mysql_connection'],args['agc_avc_table']

+ 1 - 30
data_processing/processing_limit_power/processing_limit_power_by_machines.py

@@ -6,38 +6,9 @@ from flask import Flask, request
 import time
 import logging
 import traceback
-
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo
 app = Flask('processing_limit_power_by_machines——service')
 
-
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table = args['mongodb_connection'],args['mongodb_database'],args['mongodb_read_table']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    data_from_db = collection.find()  # 这会返回一个游标(cursor)
-    # 将游标转换为列表,并创建 pandas DataFrame
-    df = pd.DataFrame(list(data_from_db))
-    client.close()
-    return df
-    
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = args['mongodb_connection'],args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
-    
-
 #样板机法
 def windLight_machine_judgment(power,args):
     col_rp,col_tp = args['col_rp'],args['col_tp']

+ 1 - 28
data_processing/processing_limit_power/processing_limit_power_by_records.py

@@ -5,35 +5,8 @@ from flask import Flask,request
 import time
 import logging
 import traceback
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo
 app = Flask('processing_limit_power_by_records——service')
-
-
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    data_from_db = collection.find()  # 这会返回一个游标(cursor)
-    # 将游标转换为列表,并创建 pandas DataFrame
-    df = pd.DataFrame(list(data_from_db))
-    client.close()
-    return df
-    
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
     
 
 def limit_record_judgement(power,args):

+ 1 - 28
data_processing/processing_limit_power/processing_limit_power_by_statistics_light.py

@@ -8,6 +8,7 @@ from sklearn.linear_model import LinearRegression
 import numpy as np
 from bson.decimal128 import Decimal128
 import numbers
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo
 app = Flask('processing_limit_power_by_statistics_light——service')
 
 
@@ -16,34 +17,6 @@ def hello():
     return jsonify(message='Hello, World!')
 
 
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    data_from_db = collection.find()  # 这会返回一个游标(cursor)
-    # 将游标转换为列表,并创建 pandas DataFrame
-    df = pd.DataFrame(list(data_from_db))
-    client.close()
-    return df
-
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
-
-
 def light_statistics_judgement(df_power,args):
     """
     原理:基于实测辐照度与实际功率相关性强正相关,呈严格线性关系为假设前提,

+ 1 - 29
data_processing/processing_limit_power/processing_limit_power_by_statistics_wind.py

@@ -9,7 +9,7 @@ from sklearn.preprocessing import StandardScaler
 from sklearn.cluster import DBSCAN
 import numbers
 from bson.decimal128 import Decimal128
-
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo
 app = Flask('processing_limit_power_by_statistics_wind——service')
 
 
@@ -18,34 +18,6 @@ def hello():
     return jsonify(message='Hello, World!')
 
 
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    data_from_db = collection.find()  # 这会返回一个游标(cursor)
-    # 将游标转换为列表,并创建 pandas DataFrame
-    df = pd.DataFrame(list(data_from_db))
-    client.close()
-    return df
-
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
-
-
 def wind_statistics_judgement(df_power,args):
     """
     原理:基于实测辐照度与实际功率相关性强正相关,呈严格线性关系为假设前提,

+ 247 - 247
evaluation_processing/analysis.py

@@ -1,247 +1,247 @@
-# -*- coding: utf-8 -*-
-import pandas as pd
-import matplotlib.pyplot as plt
-from pymongo import MongoClient
-import pickle
-import numpy as np
-import plotly.express as px
-from plotly.subplots import make_subplots
-import plotly.graph_objects as go
-from flask import Flask,request,jsonify
-from waitress import serve
-import time
-import random
-import argparse
-import logging
-import traceback
-import os
-import lightgbm as lgb
-
-app = Flask('analysis_report——service')
-def get_data_from_mongo(args):
-    # 1.读数据 
-    mongodb_connection,mongodb_database,all_table,accuracy_table,model_table,model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['train_table'],args['accuracy_table'],args['model_table'],args['model_name']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    # 将游标转换为列表,并创建 pandas DataFrame
-    df_all = pd.DataFrame(db[all_table].find({}, {'_id': 0}))
-    
-    df_accuracy = pd.DataFrame(db[accuracy_table].find({}, {'_id': 0}))
-    
-    model_data = db[model_table].find_one({"model_name": model_name})
-    if model_data is not None:
-        model_binary = model_data['model']  # 确保这个字段是存储模型的二进制数据
-        # 反序列化模型 
-        model = pickle.loads(model_binary)
-    client.close()
-    return df_all,df_accuracy,model
-
-
-def draw_info(df_all,df_accuracy,model,features,args):
-    #1.数据描述 数据描述:
-    col_time = args['col_time']
-    label = args['label']
-    df_accuracy_beginTime = df_accuracy[col_time].min()
-    df_accuracy_endTime = df_accuracy[col_time].max()
-    df_train = df_all[df_all[col_time]<df_accuracy_beginTime][features+[col_time,label]]
-    df_train_beginTime = df_train[col_time].min()
-    df_train_endTime = df_train[col_time].max()
-    text_content = f"训练数据时间范围:{df_train_beginTime} 至 {df_train_endTime},共{df_train.shape[0]}条记录,测试集数据时间范围:{df_accuracy_beginTime} 至 {df_accuracy_endTime}。<br>lightgbm模型参数:{model.params}"
-    return text_content
-    
-
-
-def draw_global_scatter(df,args):
-    # --- 1. 实际功率和辐照度的散点图 ---
-    col_x = args['scatter_col_x']
-    col_y = args['label']
-    scatter_fig = px.scatter(
-        df,
-        x=col_x,
-        y=col_y,
-        title=f"{col_x}和{col_y}的散点图",
-        labels={"辐照度": "辐照度 (W/m²)", "实际功率": "实际功率 (kW)"}
-    )
-    return scatter_fig
-    
-    
-
-def draw_corr(df,features,args):
-
-    # --- 2. 相关性热力图 ---
-    # 计算相关性矩阵
-    label = args['label']
-    features_coor = features+[label]
-    corr_matrix = df[features_coor].corr()
-    # 使用 Plotly Express 绘制热力图
-    heatmap_fig = px.imshow(corr_matrix, 
-                    text_auto=True,  # 显示数值
-                    color_continuous_scale='RdBu',  # 配色方案
-                    title="Correlation Heatmap")
-    heatmap_fig.update_coloraxes(showscale=False)
-
-    return heatmap_fig
-
-def draw_feature_importance(model,features):
-    # --- 3. 特征重要性排名 ---
-    # 获取特征重要性
-    importance = model.feature_importance()  # 'split' 或 'gain',根据需求选择
-    
-    # 转换为 DataFrame 方便绘图
-    feature_importance_df = pd.DataFrame({
-        'Feature': features,
-        'Importance': importance
-    })
-    feature_importance_df = feature_importance_df.sort_values(by='Importance', ascending=False)
-    
-    # 使用 Plotly Express 绘制条形图
-    importance_fig = px.bar(feature_importance_df, x='Feature', y='Importance', 
-                 title="特征重要性排名", 
-                 labels={'Feature': '特征', 'Importance': '重要性'}, 
-                 color='Importance', 
-                 color_continuous_scale='Viridis')
-    # 更新每个 trace,确保没有图例
-    
-    importance_fig.update_layout(title="模型特征重要性排名", 
-                                 showlegend=False  # 移除图例
-                                )
-    importance_fig.update_coloraxes(showscale=False)
-    return importance_fig
-
-
-def draw_data_info_table(content):
-    # --- 4. 创建数据说明的表格 ---
-    # 转换为表格格式:1行1列,且填充文字说明
-    # 转换为表格格式  
-    # 创建一个空的图
-    table_fig = go.Figure()
-
-    # 第一部分: 显示文字说明
-    table_fig.add_trace(go.Table(
-        header=dict(
-            values=["说明"],  # 表格只有一列:说明
-            fill_color="paleturquoise",
-            align="center"
-        ),
-        cells=dict(
-            values=[[content]] ,  # 第一行填入文本说明
-            fill_color="lavender",
-            align="center"
-        )
-    ))
-
-   
-    return table_fig
-    
-
-
-def draw_accuracy_table(df,content):
-    
-    # --- 4. 每日的准确率表格 ---
-    # 转换为表格格式  
-    table_fig = go.Figure(
-        data=[
-            go.Table(
-                header=dict(
-                    values=list(df.columns),
-                    fill_color="paleturquoise",
-                    align="center"
-                ),
-                cells=dict(
-                    values=[df[col] for col in df.columns],
-                    fill_color="lavender",
-                    align="center"
-                )
-            )
-        ]
-    )
-    table_fig.update_layout(title="准确率表", showlegend=False)
-    return table_fig
-
-
-@app.route('/analysis_report', methods=['POST'])
-def analysis_report():
-    start_time = time.time()  
-    result = {}
-    success = 0
-    path = ""
-    print("Program starts execution!")
-    try:
-        args = request.values.to_dict()
-        print('args',args)
-        logger.info(args)
-        #获取数据
-        df_all, df_accuracy, model = get_data_from_mongo(args)
-        features = model.feature_name()
-        text_content = draw_info(df_all,df_accuracy,model,features,args)
-        text_fig,scatter_fig,heatmap_fig,importance_fig,table_fig=draw_data_info_table(text_content),draw_global_scatter(df_all,args),draw_corr(df_all,features,args),draw_feature_importance(model,features),\
-        draw_accuracy_table(df_accuracy,text_content)
-        # --- 合并图表并保存到一个 HTML 文件 ---
-        # 创建子图布局
-        combined_fig = make_subplots(
-            rows=5, cols=1,
-            subplot_titles=["数据-模型概览","辐照度和实际功率的散点图", "相关性","特征重要性排名", "准确率表"],
-            row_heights=[0.3, 0.6, 0.6, 0.6, 0.4],
-            specs=[[{"type": "table"}], [{"type": "xy"}], [{"type": "heatmap"}], [{"type": "xy"}],[{"type": "table"}]]  # 指定每个子图类型
-        )
-        # 添加文本信息到子图(第一行)
-        # 添加文字说明
-        for trace in text_fig.data:
-            combined_fig.add_trace(trace, row=1, col=1)
-            
-        # 添加散点图
-        for trace in scatter_fig.data:
-            combined_fig.add_trace(trace, row=2, col=1)
-        
-        # 添加相关性热力图
-        for trace in heatmap_fig.data:
-            combined_fig.add_trace(trace, row=3, col=1)
-            
-        # 添加特征重要性排名图
-        for trace in importance_fig.data:
-            combined_fig.add_trace(trace, row=4, col=1)
-        
-        # 添加表格
-        for trace in table_fig.data:
-            combined_fig.add_trace(trace, row=5, col=1)
-    
-        # 更新布局
-        combined_fig.update_layout(
-        height=1500,
-        title_text="分析结果汇总",  # 添加换行符以适应文本内容
-        title_x=0.5,  # 中心对齐标题
-        showlegend=False,
-        )
-        combined_fig.update_coloraxes(showscale=False)
-        filename = f"{int(time.time() * 1000)}_{random.randint(1000, 9999)}.html"
-        # 保存为 HTML
-        directory = '/usr/share/nginx/html'
-        if not os.path.exists(directory):
-            os.makedirs(directory)
-        file_path = os.path.join(directory, filename)
-        # combined_fig.write_html(f"D://usr//{filename}")
-        combined_fig.write_html(file_path)
-        path = f"http://ds2:10093/{filename}"
-        success = 1
-    except Exception as e:
-        my_exception = traceback.format_exc()
-        my_exception.replace("\n","\t")
-        result['msg'] = my_exception
-    end_time = time.time() 
-    result['success'] = success
-    result['args'] = args
-    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
-    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
-    result['file_path'] = path
-    print("Program execution ends!")
-    return result
-
-
-if __name__=="__main__":  
-    print("Program starts execution!")
-    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-    logger = logging.getLogger("analysis_report log")
-    from waitress import serve
-    serve(app, host="0.0.0.0", port=10092)
-    print("server start!")
+# # -*- coding: utf-8 -*-
+# import pandas as pd
+# import matplotlib.pyplot as plt
+# from pymongo import MongoClient
+# import pickle
+# import numpy as np
+# import plotly.express as px
+# from plotly.subplots import make_subplots
+# import plotly.graph_objects as go
+# from flask import Flask,request,jsonify
+# from waitress import serve
+# import time
+# import random
+# import argparse
+# import logging
+# import traceback
+# import os
+# import lightgbm as lgb
+#
+# app = Flask('analysis_report——service')
+# def get_data_from_mongo(args):
+#     # 1.读数据
+#     mongodb_connection,mongodb_database,all_table,accuracy_table,model_table,model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['train_table'],args['accuracy_table'],args['model_table'],args['model_name']
+#     client = MongoClient(mongodb_connection)
+#     # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+#     db = client[mongodb_database]
+#     # 将游标转换为列表,并创建 pandas DataFrame
+#     df_all = pd.DataFrame(db[all_table].find({}, {'_id': 0}))
+#
+#     df_accuracy = pd.DataFrame(db[accuracy_table].find({}, {'_id': 0}))
+#
+#     model_data = db[model_table].find_one({"model_name": model_name})
+#     if model_data is not None:
+#         model_binary = model_data['model']  # 确保这个字段是存储模型的二进制数据
+#         # 反序列化模型
+#         model = pickle.loads(model_binary)
+#     client.close()
+#     return df_all,df_accuracy,model
+#
+#
+# def draw_info(df_all,df_accuracy,model,features,args):
+#     #1.数据描述 数据描述:
+#     col_time = args['col_time']
+#     label = args['label']
+#     df_accuracy_beginTime = df_accuracy[col_time].min()
+#     df_accuracy_endTime = df_accuracy[col_time].max()
+#     df_train = df_all[df_all[col_time]<df_accuracy_beginTime][features+[col_time,label]]
+#     df_train_beginTime = df_train[col_time].min()
+#     df_train_endTime = df_train[col_time].max()
+#     text_content = f"训练数据时间范围:{df_train_beginTime} 至 {df_train_endTime},共{df_train.shape[0]}条记录,测试集数据时间范围:{df_accuracy_beginTime} 至 {df_accuracy_endTime}。<br>lightgbm模型参数:{model.params}"
+#     return text_content
+#
+#
+#
+# def draw_global_scatter(df,args):
+#     # --- 1. 实际功率和辐照度的散点图 ---
+#     col_x = args['scatter_col_x']
+#     col_y = args['label']
+#     scatter_fig = px.scatter(
+#         df,
+#         x=col_x,
+#         y=col_y,
+#         title=f"{col_x}和{col_y}的散点图",
+#         labels={"辐照度": "辐照度 (W/m²)", "实际功率": "实际功率 (kW)"}
+#     )
+#     return scatter_fig
+#
+#
+#
+# def draw_corr(df,features,args):
+#
+#     # --- 2. 相关性热力图 ---
+#     # 计算相关性矩阵
+#     label = args['label']
+#     features_coor = features+[label]
+#     corr_matrix = df[features_coor].corr()
+#     # 使用 Plotly Express 绘制热力图
+#     heatmap_fig = px.imshow(corr_matrix,
+#                     text_auto=True,  # 显示数值
+#                     color_continuous_scale='RdBu',  # 配色方案
+#                     title="Correlation Heatmap")
+#     heatmap_fig.update_coloraxes(showscale=False)
+#
+#     return heatmap_fig
+#
+# def draw_feature_importance(model,features):
+#     # --- 3. 特征重要性排名 ---
+#     # 获取特征重要性
+#     importance = model.feature_importance()  # 'split' 或 'gain',根据需求选择
+#
+#     # 转换为 DataFrame 方便绘图
+#     feature_importance_df = pd.DataFrame({
+#         'Feature': features,
+#         'Importance': importance
+#     })
+#     feature_importance_df = feature_importance_df.sort_values(by='Importance', ascending=False)
+#
+#     # 使用 Plotly Express 绘制条形图
+#     importance_fig = px.bar(feature_importance_df, x='Feature', y='Importance',
+#                  title="特征重要性排名",
+#                  labels={'Feature': '特征', 'Importance': '重要性'},
+#                  color='Importance',
+#                  color_continuous_scale='Viridis')
+#     # 更新每个 trace,确保没有图例
+#
+#     importance_fig.update_layout(title="模型特征重要性排名",
+#                                  showlegend=False  # 移除图例
+#                                 )
+#     importance_fig.update_coloraxes(showscale=False)
+#     return importance_fig
+#
+#
+# def draw_data_info_table(content):
+#     # --- 4. 创建数据说明的表格 ---
+#     # 转换为表格格式:1行1列,且填充文字说明
+#     # 转换为表格格式
+#     # 创建一个空的图
+#     table_fig = go.Figure()
+#
+#     # 第一部分: 显示文字说明
+#     table_fig.add_trace(go.Table(
+#         header=dict(
+#             values=["说明"],  # 表格只有一列:说明
+#             fill_color="paleturquoise",
+#             align="center"
+#         ),
+#         cells=dict(
+#             values=[[content]] ,  # 第一行填入文本说明
+#             fill_color="lavender",
+#             align="center"
+#         )
+#     ))
+#
+#
+#     return table_fig
+#
+#
+#
+# def draw_accuracy_table(df,content):
+#
+#     # --- 4. 每日的准确率表格 ---
+#     # 转换为表格格式
+#     table_fig = go.Figure(
+#         data=[
+#             go.Table(
+#                 header=dict(
+#                     values=list(df.columns),
+#                     fill_color="paleturquoise",
+#                     align="center"
+#                 ),
+#                 cells=dict(
+#                     values=[df[col] for col in df.columns],
+#                     fill_color="lavender",
+#                     align="center"
+#                 )
+#             )
+#         ]
+#     )
+#     table_fig.update_layout(title="准确率表", showlegend=False)
+#     return table_fig
+#
+#
+# @app.route('/analysis_report', methods=['POST'])
+# def analysis_report():
+#     start_time = time.time()
+#     result = {}
+#     success = 0
+#     path = ""
+#     print("Program starts execution!")
+#     try:
+#         args = request.values.to_dict()
+#         print('args',args)
+#         logger.info(args)
+#         #获取数据
+#         df_all, df_accuracy, model = get_data_from_mongo(args)
+#         features = model.feature_name()
+#         text_content = draw_info(df_all,df_accuracy,model,features,args)
+#         text_fig,scatter_fig,heatmap_fig,importance_fig,table_fig=draw_data_info_table(text_content),draw_global_scatter(df_all,args),draw_corr(df_all,features,args),draw_feature_importance(model,features),\
+#         draw_accuracy_table(df_accuracy,text_content)
+#         # --- 合并图表并保存到一个 HTML 文件 ---
+#         # 创建子图布局
+#         combined_fig = make_subplots(
+#             rows=5, cols=1,
+#             subplot_titles=["数据-模型概览","辐照度和实际功率的散点图", "相关性","特征重要性排名", "准确率表"],
+#             row_heights=[0.3, 0.6, 0.6, 0.6, 0.4],
+#             specs=[[{"type": "table"}], [{"type": "xy"}], [{"type": "heatmap"}], [{"type": "xy"}],[{"type": "table"}]]  # 指定每个子图类型
+#         )
+#         # 添加文本信息到子图(第一行)
+#         # 添加文字说明
+#         for trace in text_fig.data:
+#             combined_fig.add_trace(trace, row=1, col=1)
+#
+#         # 添加散点图
+#         for trace in scatter_fig.data:
+#             combined_fig.add_trace(trace, row=2, col=1)
+#
+#         # 添加相关性热力图
+#         for trace in heatmap_fig.data:
+#             combined_fig.add_trace(trace, row=3, col=1)
+#
+#         # 添加特征重要性排名图
+#         for trace in importance_fig.data:
+#             combined_fig.add_trace(trace, row=4, col=1)
+#
+#         # 添加表格
+#         for trace in table_fig.data:
+#             combined_fig.add_trace(trace, row=5, col=1)
+#
+#         # 更新布局
+#         combined_fig.update_layout(
+#         height=1500,
+#         title_text="分析结果汇总",  # 添加换行符以适应文本内容
+#         title_x=0.5,  # 中心对齐标题
+#         showlegend=False,
+#         )
+#         combined_fig.update_coloraxes(showscale=False)
+#         filename = f"{int(time.time() * 1000)}_{random.randint(1000, 9999)}.html"
+#         # 保存为 HTML
+#         directory = '/usr/share/nginx/html'
+#         if not os.path.exists(directory):
+#             os.makedirs(directory)
+#         file_path = os.path.join(directory, filename)
+#         # combined_fig.write_html(f"D://usr//{filename}")
+#         combined_fig.write_html(file_path)
+#         path = f"http://ds2:10093/{filename}"
+#         success = 1
+#     except Exception as e:
+#         my_exception = traceback.format_exc()
+#         my_exception.replace("\n","\t")
+#         result['msg'] = my_exception
+#     end_time = time.time()
+#     result['success'] = success
+#     result['args'] = args
+#     result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+#     result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+#     result['file_path'] = path
+#     print("Program execution ends!")
+#     return result
+#
+#
+# if __name__=="__main__":
+#     # print("Program starts execution!")
+#     # logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+#     # logger = logging.getLogger("analysis_report log")
+#     # from waitress import serve
+#     # serve(app, host="0.0.0.0", port=10092)
+#     # print("server start!")

+ 303 - 0
evaluation_processing/analysis_report.py

@@ -0,0 +1,303 @@
+# -*- coding: utf-8 -*-
+from plotly.subplots import make_subplots
+from flask import Flask,request
+import time
+import random
+import logging
+import traceback
+import os
+from common.database_dml import get_df_list_from_mongo,insert_data_into_mongo
+import plotly.express as px
+import plotly.graph_objects as go
+import pandas as pd
+import plotly.io as pio
+
+app = Flask('analysis_report——service')
+
+def put_analysis_report_to_html(args,df_clean,df_accuracy):
+    col_time = args['col_time']
+    col_x_env = args['col_x_env']
+    col_x_pre = args['col_x_pre']
+    label = args['label']
+    label_pre = args['label_pre']
+    farmId =  args['farmId']
+    df_overview = pd.DataFrame(
+        {'数据开始时间': [df_clean[col_time].min()], '数据结束时间': [df_clean[col_time].max()], '数据总记录数': [df_clean.shape[0]]})
+    overview_html = df_overview.to_html(classes='table table-bordered table-striped', index=False)
+
+    # -------------------- 数据描述 --------------------
+    describe_html = df_clean.describe().reset_index().rename(columns={'index': '统计量'}).to_html(
+        classes='table table-bordered table-striped', index=False)
+
+    # -------------------- 实测气象与实际功率散点图--------------------
+
+    # 生成实际功率与辐照度的散点图
+    fig_scatter = px.scatter(df_clean, x=col_x_env, y=label)
+
+    # 自定义散点图布局
+    fig_scatter.update_layout(
+        template='seaborn',
+        plot_bgcolor='rgba(255, 255, 255, 0.8)',  # 背景色
+        xaxis=dict(
+            showgrid=True,
+            gridcolor='rgba(200, 200, 200, 0.5)',
+            title=col_x_env
+        ),
+        yaxis=dict(
+            showgrid=True,
+            gridcolor='rgba(200, 200, 200, 0.5)',
+            title=label
+        ),
+        legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', bordercolor='black', borderwidth=1)
+    )
+
+    # 将散点图保存为 HTML 片段
+    scatter_html = pio.to_html(fig_scatter, full_html=False)
+
+    # -------------------- 生成相关性热力图 --------------------
+
+    # 计算相关矩阵
+    correlation_matrix = df_clean.corr()
+
+    # 生成热力图,带数值标签和新配色
+    fig_heatmap = go.Figure(data=go.Heatmap(
+        z=correlation_matrix.values,
+        x=correlation_matrix.columns,
+        y=correlation_matrix.columns,
+        colorscale='RdBu',  # 使用红蓝配色:正相关为蓝色,负相关为红色
+        text=correlation_matrix.round(2).astype(str),  # 将相关性值保留两位小数并转换为字符串
+        texttemplate="%{text}",  # 显示数值标签
+        colorbar=dict(title='Correlation'),
+        zmin=-1, zmax=1  # 设置颜色映射的范围
+    ))
+
+    # 自定义热力图布局
+    fig_heatmap.update_layout(
+        # title='Correlation Matrix Heatmap',
+        xaxis=dict(tickangle=45),
+        yaxis=dict(autorange='reversed'),
+        template='seaborn'
+    )
+
+    # 将热力图保存为 HTML 片段
+    corr_html = pio.to_html(fig_heatmap, full_html=False)
+
+    # -------------------- 实测气象与预测气象趋势曲线 --------------------
+
+    # 生成折线图(以 C_GLOBALR 和 NWP预测总辐射 为例)
+    fig_line = px.line(df_clean, x=col_time, y=[col_x_env, col_x_pre], markers=True)
+
+    # 自定义趋势图布局
+    fig_line.update_layout(
+        template='seaborn',
+        # title=dict(text=f"{col_x_env}与{col_x_pre}趋势曲线",
+        # x=0.5, font=dict(size=24, color='darkblue')),
+        plot_bgcolor='rgba(255, 255, 255, 0.8)',  # 改为白色背景
+        xaxis=dict(
+            showgrid=True,
+            gridcolor='rgba(200, 200, 200, 0.5)',  # 网格线颜色
+            rangeslider=dict(visible=True),  # 显示滚动条
+            rangeselector=dict(visible=True)  # 显示预设的时间范围选择器
+        ),
+        yaxis=dict(showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)'),
+        legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', bordercolor='black', borderwidth=1)
+    )
+
+    # 将折线图保存为 HTML 片段
+    env_pre_html = pio.to_html(fig_line, full_html=False)
+
+    # -------------------- 实测气象与预测气象偏差密度曲线 --------------------
+
+    df_clean['deviation'] = df_clean[col_x_pre] - df_clean[col_x_env]
+    # 生成预测与实测辐照度偏差的密度曲线图
+    # 生成偏差的密度图
+    fig_density = px.histogram(df_clean, x='deviation', nbins=30, marginal='rug', opacity=0.75,
+                               histnorm='density')
+
+    # 自定义密度曲线图布局
+    fig_density.update_layout(
+        template='seaborn',
+        # # title=dict(text=f"{col_x_pre}与{col_x_env}偏差密度曲线",
+        # x=0.5, font=dict(size=24, color='darkred')),
+        plot_bgcolor='rgba(255, 255, 255, 0.8)',
+        xaxis=dict(
+            showgrid=True,
+            gridcolor='rgba(200, 200, 200, 0.5)',
+            title='偏差'
+        ),
+        yaxis=dict(
+            showgrid=True,
+            gridcolor='rgba(200, 200, 200, 0.5)',
+            title='Density'
+        ),
+        legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', bordercolor='black', borderwidth=1)
+    )
+
+    # 将密度曲线图保存为 HTML 片段
+    density_html = pio.to_html(fig_density, full_html=False)
+
+    # -------------------- 预测功率与实际功率曲线 --------------------
+
+    # 生成折线图(以 C_GLOBALR 和 NWP预测总辐射 为例)
+    fig_line = px.line(df_clean, x='dateTime', y=[label, label_pre], markers=True)
+
+    # 自定义趋势图布局
+    fig_line.update_layout(
+        template='seaborn',
+        # title=dict(text=f"{label_pre}与{label}曲线",
+        # x=0.5, font=dict(size=24, color='darkblue')),
+        plot_bgcolor='rgba(255, 255, 255, 0.8)',  # 改为白色背景
+        xaxis=dict(
+            showgrid=True,
+            gridcolor='rgba(200, 200, 200, 0.5)',  # 网格线颜色
+            rangeslider=dict(visible=True),  # 显示滚动条
+            rangeselector=dict(visible=True)  # 显示预设的时间范围选择器
+        ),
+        yaxis=dict(showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)'),
+        legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', bordercolor='black', borderwidth=1)
+    )
+
+    # 将折线图保存为 HTML 片段
+    power_html = pio.to_html(fig_line, full_html=False)
+
+    # -------------------- 准确率表展示--------------------
+    acc_html = df_accuracy.to_html(classes='table table-bordered table-striped', index=False)
+    # -------------------- 生成完整 HTML 页面 --------------------
+
+    html_content = f"""
+    <!DOCTYPE html>
+    <html lang="en">
+    <head>
+        <meta charset="UTF-8">
+        <meta name="viewport" content="width=device-width, initial-scale=1.0">
+        <title>Data Analysis Report</title>
+        <!-- 引入 Bootstrap CSS -->
+        <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
+        <style>
+            body {{
+                background-color: #f4f4f9;
+                font-family: Arial, sans-serif;
+                padding: 20px;
+            }}
+            .container {{
+                background-color: #fff;
+                padding: 20px;
+                border-radius: 10px;
+                box-shadow: 0 4px 8px rgba(0, 0, 0, 0.1);
+                margin-bottom: 30px;
+            }}
+           h1 {{
+                text-align: center;
+                color: #333;
+                margin-bottom: 20px;
+            }}
+            .plot-container {{
+                margin: 20px 0;
+                max-height: 500px;  /* 限制高度 */
+                overflow-y: auto;   /* 显示垂直滚动条 */
+            }}
+            .table-container {{
+                margin-top: 30px;
+                overflow-x: auto;   /* 水平滚动条 */
+                max-width: 100%;     /* 限制宽度 */
+                white-space: nowrap; /* 防止内容换行 */
+            }}
+            table {{
+                width: 100%;
+                font-size: 12px;  /* 设置字体大小为12px */
+            }}
+            th, td {{
+                text-align: center;  /* 表头和单元格文字居中 */
+            }}
+        </style>
+    </head>
+    <body>
+        <div class="container">
+            <h1>分析报告</h1>
+            <!-- Pandas DataFrame 表格 -->
+            <div class="table-container">
+                <h2>1. 数据总览</h2>
+                {overview_html}
+            </div>
+            <!-- Pandas DataFrame 表格 -->
+            <div class="table-container">
+                <h2>2. 数据描述</h2>
+                {describe_html}
+            </div>
+            <div class="plot-container">
+                <h2>3. 数据清洗后实测气象与实际功率散点图</h2>
+                {scatter_html}
+            </div>
+            <div class="plot-container">
+                <h2>4. 相关性分析</h2>
+                {corr_html}
+            </div>
+            <div class="plot-container">
+                <h2>5. 实测气象与预测气象曲线趋势</h2>
+                {env_pre_html}
+            </div>
+            <div class="plot-container">
+                <h2>6. 预测气象与实测气象偏差曲线</h2>
+                {density_html}
+            </div>
+            <div class="plot-container">
+                <h2>7. 预测功率与实际功率曲线对比</h2>
+                {power_html}
+            </div>
+            <!-- Pandas DataFrame 表格 -->
+            <div class="table-container">
+                <h2>8. 准确率对比</h2>
+                {acc_html}
+            </div>
+        </div>
+    </body>
+    </html>
+    """
+    filename = f"{farmId}_{int(time.time() * 1000)}_{random.randint(1000, 9999)}.html"
+    # 保存为 HTML
+    directory = '/data/html'
+    if not os.path.exists(directory):
+        os.makedirs(directory)
+    file_path = os.path.join(directory, filename)
+    path = f"http://ds3:10010/{filename}"
+    # 将 HTML 内容写入文件
+    with open(file_path, "w", encoding="utf-8") as f:
+        f.write(html_content)
+    print("HTML report generated successfully!")
+    return path
+@app.route('/analysis_report', methods=['POST'])
+def analysis_report():
+    start_time = time.time()  
+    result = {}
+    success = 0
+    path = ""
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        print('args',args)
+        logger.info(args)
+        #获取数据
+        df_clean, df_accuracy = get_df_list_from_mongo(args)[0], get_df_list_from_mongo(args)[1]
+        path = put_analysis_report_to_html(args, df_clean, df_accuracy)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n","\t")
+        result['msg'] = my_exception
+    end_time = time.time() 
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    result['file_path'] = path
+    print("Program execution ends!")
+    return result
+
+
+if __name__=="__main__":  
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("analysis_report log")
+    from waitress import serve
+    serve(app, host="0.0.0.0", port=10092)
+    print("server start!")

+ 1 - 33
models_processing/model_predict/model_prediction_lightgbm.py

@@ -5,40 +5,8 @@ from flask import Flask,request
 import time
 import logging
 import traceback
-
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo
 app = Flask('model_prediction_lightgbm——service')
-
-
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table,timeBegin,timeEnd = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'],args['timeBegin'],args['timeEnd']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    query = {"dateTime": {"$gte": timeBegin, "$lte": timeEnd}}
-    cursor = collection.find(query)
-    data = list(cursor)
-    df = pd.DataFrame(data)
-    # 4. 删除 _id 字段(可选)
-    if '_id' in df.columns:
-        df = df.drop(columns=['_id'])
-    client.close()
-    return df
-    
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
     
 
 def model_prediction(df,args):

+ 5 - 82
models_processing/model_predict/model_prediction_lstm.py

@@ -1,78 +1,13 @@
-import pandas as pd
-from pymongo import MongoClient
 from flask import Flask,request
 import time
 import logging
 import traceback
-from io import BytesIO
-import joblib
 import numpy as np
-import h5py
-import tensorflow as tf
 from itertools import chain
-
-
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo,get_h5_model_from_mongo,get_scaler_model_from_mongo
 app = Flask('model_prediction_lstm——service')
 
 
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table,timeBegin,timeEnd = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'],args['timeBegin'],args['timeEnd']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    query = {"dateTime": {"$gte": timeBegin, "$lte": timeEnd}}
-    cursor = collection.find(query)
-    data = list(cursor)
-    df = pd.DataFrame(data)
-    # 4. 删除 _id 字段(可选)
-    if '_id' in df.columns:
-        df = df.drop(columns=['_id'])
-    client.close()
-    return df
-    
-
-def insert_data_into_mongo(res_df,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    # 将 DataFrame 转为字典格式
-    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
-    # 插入到 MongoDB
-    collection.insert_many(data_dict)
-    print("data inserted successfully!")
-
-
-def get_model_from_mongo(args):
-    mongodb_connection,mongodb_database,model_table,model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['model_table'],args['model_name']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[model_table]  # 集合名称
-
-     # 查询 MongoDB 获取模型数据
-    model_doc = collection.find_one({"model_name": model_name})
-    if model_doc:
-        model_data = model_doc['model_data']  # 获取模型的二进制数据
-        # 将二进制数据加载到 BytesIO 缓冲区
-        model_buffer = BytesIO(model_data)
-        # 从缓冲区加载模型
-         # 使用 h5py 和 BytesIO 从内存中加载模型
-        with h5py.File(model_buffer, 'r') as f:
-            model = tf.keras.models.load_model(f)
-        print(f"{model_name}模型成功从 MongoDB 加载!")
-        client.close()
-        return model
-    else:
-        print(f"未找到model_name为 {model_name} 的模型。")
-        client.close()
-        return None
-
-
 # 创建时间序列数据
 def create_sequences(data_features,data_target,time_steps):
     X, y = [], []
@@ -86,28 +21,16 @@ def create_sequences(data_features,data_target,time_steps):
                 y.append(data_target[i + time_steps -1])
         return np.array(X), np.array(y)
 
-def model_prediction(df,args):
-
-    mongodb_connection, mongodb_database, scaler_table, features, time_steps, col_time = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
-                                        args['mongodb_database'], args['scaler_table'], str_to_list(args['features']),int(args['time_steps']),args['col_time'])
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[scaler_table]  # 集合名称
-    # Retrieve the scalers from MongoDB
-    scaler_doc = collection.find_one()
-    # Deserialize the scalers
 
-    feature_scaler_bytes = BytesIO(scaler_doc["feature_scaler"])
-    feature_scaler = joblib.load(feature_scaler_bytes)
-    target_scaler_bytes = BytesIO(scaler_doc["target_scaler"])
-    target_scaler = joblib.load(target_scaler_bytes)
+def model_prediction(df,args):
+    features, time_steps, col_time =  str_to_list(args['features']), int(args['time_steps']),args['col_time']
+    feature_scaler,target_scaler = get_scaler_model_from_mongo(args)
     df = df.fillna(method='ffill').fillna(method='bfill').sort_values(by=col_time)
     scaled_features = feature_scaler.transform(df[features])
     X_predict, _ = create_sequences(scaled_features, [], time_steps)
     # 加载模型时传入自定义损失函数
     # model = load_model(f'{farmId}_model.h5', custom_objects={'rmse': rmse})
-    model = get_model_from_mongo(args)
+    model = get_h5_model_from_mongo(args)
     y_predict = list(chain.from_iterable(target_scaler.inverse_transform([model.predict(X_predict).flatten()])))
     result = df[-len(y_predict):]
     result['predict'] = y_predict

+ 4 - 40
models_processing/model_train/model_training_lightgbm.py

@@ -1,43 +1,13 @@
 import lightgbm as lgb
-import pandas as pd 
 import numpy as np
-from pymongo import MongoClient
-import pickle
 from sklearn.model_selection import train_test_split
 from sklearn.metrics import mean_squared_error,mean_absolute_error
 from flask import Flask,request
 import time
 import traceback
 import logging
-
+from common.database_dml import get_data_from_mongo,insert_pickle_model_into_mongo
 app = Flask('model_training_lightgbm——service')
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table,timeBegin,timeEnd = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'],args['timeBegin'],args['timeEnd']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    query = {"dateTime": {"$gte": timeBegin, "$lte": timeEnd}}
-    cursor = collection.find(query)
-    data = list(cursor)
-    df = pd.DataFrame(data)
-    # 4. 删除 _id 字段(可选)
-    if '_id' in df.columns:
-        df = df.drop(columns=['_id'])
-    client.close()
-    return df
-    
-
-def insert_model_into_mongo(model_data,args):
-    mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    if mongodb_write_table in db.list_collection_names():
-        db[mongodb_write_table].drop()
-        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
-    collection = db[mongodb_write_table]  # 集合名称
-    collection.insert_one(model_data)
-    print("model inserted successfully!")
 
 
 def build_model(df,args):
@@ -75,14 +45,8 @@ def build_model(df,args):
     mae = mean_absolute_error(y_test, y_pred)
     print(f'The test rmse is: {rmse},"The test mae is:"{mae}')
     
-    # 序列化模型
-    model_bytes = pickle.dumps(gbm)
-    model_data = {
-                'model_name': model_name,
-                'model': model_bytes,     #将模型字节流存入数据库
-        }
-    print('Training completed!')
-    return model_data
+
+    return gbm
 
 
 def str_to_list(arg):
@@ -105,7 +69,7 @@ def model_training_lightgbm():
         logger.info(args)
         power_df = get_data_from_mongo(args)
         model = build_model(power_df,args)
-        insert_model_into_mongo(model,args)
+        insert_pickle_model_into_mongo(model,args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()

+ 14 - 60
models_processing/model_train/model_training_lstm.py

@@ -12,68 +12,22 @@ import joblib
 from tensorflow.keras.models import Sequential
 from tensorflow.keras.layers import LSTM, Dense
 from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
-import matplotlib.pyplot as plt
+# import matplotlib.pyplot as plt
 import tensorflow as tf
-
+from common.database_dml import get_data_from_mongo,insert_h5_model_into_mongo
 
 app = Flask('model_training_lightgbm——service')
 
-def draw_loss(history):
-    #绘制训练集和验证集损失
-    plt.figure(figsize=(20, 8))
-    plt.plot(history.history['loss'], label='Training Loss')
-    plt.plot(history.history['val_loss'], label='Validation Loss')
-    plt.title('Loss Curve')
-    plt.xlabel('Epochs')
-    plt.ylabel('Loss')
-    plt.legend()
-    plt.show()
-
-def get_data_from_mongo(args):
-    mongodb_connection,mongodb_database,mongodb_read_table,timeBegin,timeEnd = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'],args['timeBegin'],args['timeEnd']
-    client = MongoClient(mongodb_connection)
-    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
-    db = client[mongodb_database]
-    collection = db[mongodb_read_table]  # 集合名称
-    query = {"dateTime": {"$gte": timeBegin, "$lte": timeEnd}}
-    cursor = collection.find(query)
-    data = list(cursor)
-    df = pd.DataFrame(data)
-    # 4. 删除 _id 字段(可选)
-    if '_id' in df.columns:
-        df = df.drop(columns=['_id'])
-    client.close()
-    return df
-    
-
-def insert_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args):
-    mongodb_connection,mongodb_database,scaler_table,model_table,model_name = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
-                                args['mongodb_database'],args['scaler_table'],args['model_table'],args['model_name'])
-    client = MongoClient(mongodb_connection)
-    db = client[mongodb_database]
-    collection = db[scaler_table]  # 集合名称
-    # Save the scalers in MongoDB as binary data
-    collection.insert_one({
-        "feature_scaler": feature_scaler_bytes.read(),
-        "target_scaler": target_scaler_bytes.read()
-    })
-    print("model inserted successfully!")
-    model_table = db[model_table]
-    # 创建 BytesIO 缓冲区
-    model_buffer = BytesIO()
-    # 将模型保存为 HDF5 格式到内存 (BytesIO)
-    model.save(model_buffer, save_format='h5')
-    # 将指针移到缓冲区的起始位置
-    model_buffer.seek(0)
-    # 获取模型的二进制数据
-    model_data = model_buffer.read()
-    # 将模型保存到 MongoDB
-    model_table.insert_one({
-        "model_name": model_name,
-        "model_data": model_data
-    })
-    print("模型成功保存到 MongoDB!")
-
+# def draw_loss(history):
+#     #绘制训练集和验证集损失
+#     plt.figure(figsize=(20, 8))
+#     plt.plot(history.history['loss'], label='Training Loss')
+#     plt.plot(history.history['val_loss'], label='Validation Loss')
+#     plt.title('Loss Curve')
+#     plt.xlabel('Epochs')
+#     plt.ylabel('Loss')
+#     plt.legend()
+#     plt.show()
 
 def rmse(y_true, y_pred):
     return tf.math.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))
@@ -130,7 +84,7 @@ def build_model(data, args):
                         validation_data=(X_test, y_test),
                         verbose=2,
                         callbacks=[early_stopping, reduce_lr])
-    draw_loss(history)
+    # draw_loss(history)
     return model,feature_scaler_bytes,target_scaler_bytes
 
 
@@ -154,7 +108,7 @@ def model_training_lstm():
         logger.info(args)
         power_df = get_data_from_mongo(args)
         model,feature_scaler_bytes,target_scaler_bytes = build_model(power_df,args)
-        insert_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args)
+        insert_h5_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()

+ 1 - 1
run_all.py

@@ -9,7 +9,7 @@ services = [
     ("data_processing/processing_limit_power/processing_limit_power_by_records.py", 10088),
     ("data_processing/processing_limit_power/processing_limit_power_by_statistics_light.py", 10085),
     ("data_processing/processing_limit_power/processing_limit_power_by_statistics_wind.py", 10093),
-    ("evaluation_processing/analysis.py", 10092),
+    ("evaluation_processing/analysis_report.py", 10092),
     ("evaluation_processing/evaluation_accuracy.py", 10091),
     ("models_processing/model_train/model_training_lightgbm.py", 10089),
     ("models_processing/model_predict/model_prediction_lightgbm.py", 10090),