# -*- coding: utf-8 -*- import numpy as np from flask import Flask, request import time import random import logging import traceback import os from common.database_dml import get_df_list_from_mongo, insert_data_into_mongo import plotly.express as px import plotly.graph_objects as go import pandas as pd import plotly.io as pio from bson.decimal128 import Decimal128 import numbers from common.processing_data_common import str_to_list,generate_unique_colors from scipy.stats import gaussian_kde app = Flask('analysis_report——service') def put_analysis_report_to_html(args, df_clean, df_predict, df_accuracy): col_time = args['col_time'] col_x_env = args['col_x_env'] col_x_pre = str_to_list(args['col_x_pre']) label = args['label'] label_pre = args['label_pre'] farmId = args['farmId'] acc_flag = df_accuracy.shape[0] df_clean = df_clean.applymap( lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x).sort_values( by=col_time) df_predict = df_predict.applymap( lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x).sort_values( by=col_time) if acc_flag>0: df_accuracy = df_accuracy.applymap( lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x).sort_values( by=col_time) total_size = df_clean.shape[0] clean_size = total_size if 'is_limit' in df_clean.columns: df_clean['is_limit'] = df_clean['is_limit'].apply(lambda x: '正常点' if x==0 else '异常点') clean_size = df_clean[df_clean['is_limit']=='正常点'].shape[0] df_overview = pd.DataFrame( {'场站编码':[farmId], '数据开始时间': [df_clean[col_time].min()], '数据结束时间': [df_clean[col_time].max()], '总天数':[(pd.to_datetime(df_clean[col_time].max())-pd.to_datetime(df_clean[col_time].min())).days], '数据总记录数': [total_size],'清洗后记录数':[clean_size],'数据可用率':[clean_size/total_size]}) overview_html = df_overview.to_html(classes='table table-bordered table-striped', index=False) df_clean_after = df_clean[df_clean['is_limit']=='正常点'] # -------------------- 数据描述 -------------------- describe_html = df_clean.describe().reset_index().rename(columns={'index': '统计量'}).to_html( classes='table table-bordered table-striped fixed', index=False) # -------------------- 实测气象与实际功率散点图-------------------- fig_scatter = px.scatter(df_clean, x=col_x_env, y=label, color='is_limit') # 自定义散点图布局 fig_scatter.update_layout( template='seaborn', # 使用 seaborn 风格 plot_bgcolor='rgba(255, 255, 255, 0.8)', # 背景色(淡白色) xaxis=dict( showgrid=True, # 显示网格 gridcolor='rgba(200, 200, 200, 0.5)', # 网格线颜色(淡灰色) title=col_x_env, # x 轴标题 title_font=dict(size=14), # x 轴标题字体大小 tickfont=dict(size=12) # x 轴刻度标签字体大小 ), yaxis=dict( showgrid=True, # 显示网格 gridcolor='rgba(200, 200, 200, 0.5)', # 网格线颜色(淡灰色) title=label, # y 轴标题 title_font=dict(size=14), # y 轴标题字体大小 tickfont=dict(size=12) # y 轴刻度标签字体大小 ), legend=dict( x=0.01, y=0.99, # 图例位置 bgcolor='rgba(255, 255, 255, 0.7)', # 图例背景色 bordercolor='black', # 图例边框颜色 borderwidth=1, # 图例边框宽度 font=dict(size=12) # 图例文字大小 ), title=dict( # text='实际功率与辐照度的散点图', # 图表标题 x=0.5, # 标题居中 font=dict(size=16) # 标题字体大小 ), ) # 将散点图保存为 HTML 片段 scatter_html = pio.to_html(fig_scatter, full_html=False) # -------------------- 生成相关性热力图 -------------------- # 计算相关矩阵 correlation_matrix = df_clean_after.corr() # 生成热力图,带数值标签和新配色 fig_heatmap = go.Figure(data=go.Heatmap( z=correlation_matrix.values, x=correlation_matrix.columns, y=correlation_matrix.columns, colorscale='RdBu', # 使用红蓝配色:正相关为蓝色,负相关为红色 text=correlation_matrix.round(2).astype(str), # 将相关性值保留两位小数并转换为字符串 texttemplate="%{text}", # 显示数值标签 colorbar=dict(title='Correlation'), zmin=-1, zmax=1 # 设置颜色映射的范围 )) # 自定义热力图布局 fig_heatmap.update_layout( # title='Correlation Matrix Heatmap', xaxis=dict(tickangle=45), yaxis=dict(autorange='reversed'), template='seaborn' ) # 将热力图保存为 HTML 片段 corr_html = pio.to_html(fig_heatmap, full_html=False) # -------------------- 6.实测气象与预测气象趋势曲线 -------------------- # # 生成折线图(以 C_GLOBALR 和 NWP预测总辐射 为例)实际功率 # y_env = [label,col_x_env]+ col_x_pre # fig_line = px.line(df_clean, x=col_time, y=y_env, markers=True) # # fig_line = px.line(df_clean[(df_clean[col_time] >= df_predict[col_time].min()) & ( # # df_clean[col_time] <= df_predict[col_time].max())], x=col_time, y=y_env, markers=True) # # 自定义趋势图布局 # fig_line.update_layout( # template='seaborn', # # title=dict(text=f"{col_x_env}与{col_x_pre}趋势曲线", # # x=0.5, font=dict(size=24, color='darkblue')), # plot_bgcolor='rgba(255, 255, 255, 0.8)', # 改为白色背景 # xaxis=dict( # showgrid=True, # gridcolor='rgba(200, 200, 200, 0.5)', # 网格线颜色 # rangeslider=dict(visible=True), # 显示滚动条 # rangeselector=dict(visible=True) # 显示预设的时间范围选择器 # ), # yaxis=dict(showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)'), # legend=dict(x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', bordercolor='black', borderwidth=1) # ) # # # 将折线图保存为 HTML 片段 # env_pre_html = pio.to_html(fig_line, full_html=False) # 创建折线图(label 单独一个纵轴, [col_x_env] + col_x_pre 一个纵轴) fig_line = px.line(df_clean, x=col_time, y=[label] + [col_x_env] + col_x_pre, markers=True) # 修改布局,添加双轴设置 fig_line.update_layout( template='seaborn', plot_bgcolor='rgba(255, 255, 255, 0.8)', # 设置白色背景 xaxis=dict( showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)', # 网格线颜色 rangeslider=dict(visible=True), # 显示滚动条 rangeselector=dict(visible=True) # 显示预设的时间范围选择器 ), yaxis=dict( title="实际功率", # 主纵轴用于 label showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)' ), yaxis2=dict( title="环境数据", # 第二纵轴用于 [col_x_env] + col_x_pre overlaying='y', # 与主纵轴叠加 side='right', # 放置在右侧 showgrid=False # 不显示网格线 ), legend=dict( x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', bordercolor='black', borderwidth=1 ) ) # 更新每个曲线的 y 轴对应性 for i, col in enumerate([label] + [col_x_env] + col_x_pre): fig_line.data[i].update(yaxis='y' if col == label else 'y2') # 将折线图保存为 HTML 片段 env_pre_html = pio.to_html(fig_line, full_html=False) # -------------------- 5.实测气象与预测气象偏差密度曲线 -------------------- # 创建 Plotly 图形对象 fig_density = go.Figure() colors = generate_unique_colors(len(col_x_pre)) for col in zip(col_x_pre,colors): df_clean[f"{col[0]}_deviation"] = df_clean[col[0]] - df_clean[col_x_env] data = df_clean[f"{col[0]}_deviation"].dropna() # 确保没有 NaN 值 kde = gaussian_kde(data) x_vals = np.linspace(data.min(), data.max(), 1000) y_vals = kde(x_vals) # 添加曲线 fig_density.add_trace(go.Scatter( x=x_vals, y=y_vals, mode='lines', fill='tozeroy', line=dict(color=col[1]), # 循环使用颜色 name=f'Density {col[0]}' # 图例名称 )) # 生成预测与实测辐照度偏差的密度曲线图 # 将密度曲线图保存为 HTML 片段 density_html = pio.to_html(fig_density, full_html=False) # -------------------- 预测功率与实际功率曲线 -------------------- # 生成折线图(以 C_GLOBALR 和 NWP预测总辐射 为例) # 创建一个图表对象 fig = go.Figure() # 获取所有的模型 models = df_predict['model'].unique() # 添加实际功率曲线 fig.add_trace(go.Scatter( x=df_predict[col_time], y=df_predict[label], mode='lines+markers', name='实际功率', # 实际功率 line=dict( width=1), # 虚线 marker=dict(symbol='circle'), )) # 为每个模型添加预测值和实际功率的曲线 for model in models: # 筛选该模型的数据 model_data = df_predict[df_predict['model'] == model] # 添加预测值曲线 fig.add_trace(go.Scatter( x=model_data[col_time], y=model_data[label_pre], mode='lines+markers', name=f'{model} 预测值', # 预测值 marker=dict(symbol='circle'), line=dict(width=2) )) # 设置图表的标题和标签 fig.update_layout( template='seaborn', # 使用 seaborn 模板 title=dict( # text=f"{label_pre} 与 {label} 对比", # 标题 x=0.5, font=dict(size=20, color='darkblue') # 标题居中并设置字体大小和颜色 ), plot_bgcolor='rgba(255, 255, 255, 0.8)', # 背景色 xaxis=dict( showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)', # 网格线颜色 title='时间', # 时间轴标题 rangeslider=dict(visible=True), # 显示滚动条 rangeselector=dict(visible=True) # 显示预设的时间范围选择器 ), yaxis=dict( showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)', title='功率' # y轴标题 ), legend=dict( x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', # 背景透明 bordercolor='black', borderwidth=1, font=dict(size=12) # 字体大小 ), hovermode='x unified', # 鼠标悬停时显示统一的提示框 hoverlabel=dict( bgcolor='white', font_size=14, font_family="Rockwell", # 设置字体样式 bordercolor='black' ), margin=dict(l=50, r=50, t=50, b=50) # 调整边距,避免标题或标签被遮挡 ) # 将折线图保存为 HTML 片段 power_html = pio.to_html(fig, full_html=False) # -------------------- 准确率表展示-------------------- acc_html='' if acc_flag>0: acc_html = df_accuracy.sort_values(by=col_time).to_html(classes='table table-bordered table-striped', index=False) # -------------------- 准确率汇总展示-------------------- summary_html = '' if acc_flag>0: # 指定需要转换的列 cols_to_convert = ['MAE', 'accuracy', 'RMSE', 'deviationElectricity', 'deviationAssessment'] for col in cols_to_convert: if col in df_accuracy.columns: df_accuracy[col] = df_accuracy[col].apply( lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else np.nan) # 确定存在的列 agg_dict = {} rename_cols = ['model'] if 'MAE' in df_accuracy.columns: agg_dict['MAE'] = np.nanmean rename_cols.append('MAE平均值') if 'accuracy' in df_accuracy.columns: agg_dict['accuracy'] = np.nanmean rename_cols.append('准确率平均值') if 'RMSE' in df_accuracy.columns: agg_dict['RMSE'] = np.nanmean rename_cols.append('RMSE平均值') if 'deviationElectricity' in df_accuracy.columns: agg_dict['deviationElectricity'] = [np.nanmean, np.nansum] rename_cols.append('考核电量平均值') rename_cols.append('考核总电量') if 'deviationAssessment' in df_accuracy.columns: agg_dict['deviationAssessment'] = [np.nanmean, np.nansum] rename_cols.append('考核分数平均值') rename_cols.append('考核总分数') # 进行分组聚合,如果有需要聚合的列 summary_df = df_accuracy.groupby('model').agg(agg_dict).reset_index() summary_df.columns = rename_cols summary_html = summary_df.to_html(classes='table table-bordered table-striped', index=False) # -------------------- 生成完整 HTML 页面 -------------------- html_content = f""" Data Analysis Report

分析报告

1. 数据总览

{overview_html}

2. 数据描述

{describe_html}

3. 实测气象与实际功率散点图

{scatter_html}

4. 相关性分析

{corr_html}

5. 预测气象与实测气象偏差曲线

{density_html}

6. 实测气象与预测气象曲线趋势

{env_pre_html}

7. 预测功率与实际功率曲线对比

{power_html}

8. 准确率对比

公式
{acc_html}

9. 准确率汇总对比

{summary_html}
""" filename = f"{farmId}_{int(time.time() * 1000)}_{random.randint(1000, 9999)}.html" # 保存为 HTML directory = '/usr/share/nginx/html' if not os.path.exists(directory): os.makedirs(directory) file_path = os.path.join(directory, filename) path = f"http://ds3:10010/{filename}" # 将 HTML 内容写入文件 with open(file_path, "w", encoding="utf-8") as f: f.write(html_content) print("HTML report generated successfully!") return path @app.route('/analysis_report', methods=['POST']) def analysis_report(): start_time = time.time() result = {} success = 0 path = "" print("Program starts execution!") try: args = request.values.to_dict() print('args', args) logger.info(args) # 获取数据 df_clean, df_predict, df_accuracy = get_df_list_from_mongo(args)[0], get_df_list_from_mongo(args)[1], \ get_df_list_from_mongo(args)[2] path = put_analysis_report_to_html(args, df_clean, df_predict, df_accuracy) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n", "\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) result['file_path'] = path print("Program execution ends!") return result if __name__ == "__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("analysis_report log") from waitress import serve serve(app, host="0.0.0.0", port=10092) print("server start!")