# -*- coding: utf-8 -*- import numpy as np import pandas as pd from flask import Flask, request import time import random import logging import traceback import os from matplotlib.pyplot import title from common.database_dml import get_df_list_from_mongo, insert_data_into_mongo import plotly.graph_objects as go from plotly.subplots import make_subplots import plotly.io as pio from bson.decimal128 import Decimal128 import numbers app = Flask('analysis_report——service') def create_fig(df_predict, col_time, label, label_pre, point): # 创建一个图表对象 fig = go.Figure() point_data = df_predict[df_predict['howLongAgo']==point] # 获取所有的模型 models = df_predict['model'].unique() # 添加实际功率曲线 fig.add_trace(go.Scatter( x=df_predict[col_time], y=df_predict[label], mode='lines+markers', name='实际功率', # 实际功率 line=dict(width=1), # 虚线 marker=dict(symbol='circle'), )) # 为每个模型添加预测值和实际功率的曲线 for model in models: # 筛选该模型的数据 model_data = point_data[point_data['model'] == model] # 添加预测值曲线 fig.add_trace(go.Scatter( x=model_data[col_time], y=model_data[label_pre], mode='lines+markers', name=f'{model} 预测值', # 预测值 marker=dict(symbol='circle'), line=dict(width=2) )) fig_name = '超短期-第{}点'.format(point) if point < 17 else '超短期-平均值' # 设置图表的标题和标签 fig.update_layout( template='seaborn', # 使用 seaborn 模板 title=dict( text=fig_name, # 标题 x=0.5, font=dict(size=20, color='darkblue') # 标题居中并设置字体大小和颜色 ), plot_bgcolor='rgba(255, 255, 255, 0.8)', # 背景色 xaxis=dict( showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)', # 网格线颜色 title='时间', # 时间轴标题 rangeslider=dict(visible=True), # 显示滚动条 rangeselector=dict(visible=True) # 显示预设的时间范围选择器 ), yaxis=dict( showgrid=True, gridcolor='rgba(200, 200, 200, 0.5)', title='功率' # y轴标题 ), legend=dict( x=0.01, y=0.99, bgcolor='rgba(255, 255, 255, 0.7)', # 背景透明 bordercolor='black', borderwidth=1, font=dict(size=12) # 字体大小 ), hovermode='x unified', # 鼠标悬停时显示统一的提示框 hoverlabel=dict( bgcolor='white', font_size=14, font_family="Rockwell", # 设置字体样式 bordercolor='black' ), margin=dict(l=50, r=50, t=50, b=50) # 调整边距,避免标题或标签被遮挡 ) return fig def put_analysis_report_to_html(args, df_predict, df_accuracy): col_time = args['col_time'] label = args['label'] label_pre = args['label_pre'] farmId = args['farmId'] points = args['points'].split(',') cdq_title = '超短期分析报告' + args.get('title', '') acc_flag = df_accuracy.shape[0] # 获取所有的模型 models = df_predict['model'].unique() aves = [] # 添加超短期16个点平均值 for model in models: # 筛选该模型的数据 model_data = df_predict[df_predict['model'] == model] # 添加超短期16个点平均值 ave = model_data.groupby(col_time).agg({ label: 'first', 'model': 'first', label_pre: 'mean', 'farm_id': 'first' }).reset_index() ave['howLongAgo'] = 17 ave = ave.reindex(columns=df_predict.columns.tolist()) aves.append(ave) df_predict = pd.concat([df_predict]+aves) df_predict = df_predict.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x).sort_values(by=col_time) if acc_flag > 0: df_accuracy = df_accuracy.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x).sort_values(by=col_time) figs = [create_fig(df_predict, col_time, label, label_pre, int(p)) for p in points] # 将折线图保存为 HTML 片段 power_htmls = [pio.to_html(f, full_html=False) for f in figs] power_htmls = ["
{}
".format(html) for html in power_htmls] # -------------------- 准确率表展示-------------------- acc_html = '' if acc_flag > 0: acc_html = df_accuracy.sort_values(by=col_time).to_html(classes='table table-bordered table-striped', index=False) # -------------------- 准确率汇总展示-------------------- summary_html = '' if acc_flag > 0: # 指定需要转换的列 cols_to_convert = ['MAE', 'accuracy', 'RMSE', 'deviationElectricity', 'deviationAssessment'] for col in cols_to_convert: if col in df_accuracy.columns: df_accuracy[col] = df_accuracy[col].apply( lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else np.nan) # 确定存在的列 agg_dict = {} rename_cols = ['model'] if 'MAE' in df_accuracy.columns: agg_dict['MAE'] = np.nanmean rename_cols.append('MAE平均值') if 'accuracy' in df_accuracy.columns: agg_dict['accuracy'] = np.nanmean rename_cols.append('准确率平均值') if 'RMSE' in df_accuracy.columns: agg_dict['RMSE'] = np.nanmean rename_cols.append('RMSE平均值') if 'deviationElectricity' in df_accuracy.columns: agg_dict['deviationElectricity'] = [np.nanmean, np.nansum] rename_cols.append('考核电量平均值') rename_cols.append('考核总电量') if 'deviationAssessment' in df_accuracy.columns: agg_dict['deviationAssessment'] = [np.nanmean, np.nansum] rename_cols.append('考核分数平均值') rename_cols.append('考核总分数') if 'accuracyAssessment' in df_accuracy.columns: agg_dict['accuracyAssessment'] = [np.nanmean, np.nansum] rename_cols.append('考核分数平均值') rename_cols.append('考核总分数') # 进行分组聚合,如果有需要聚合的列 summary_df = df_accuracy.groupby('model').agg(agg_dict).reset_index() summary_df.columns = rename_cols summary_html = summary_df.to_html(classes='table table-bordered table-striped', index=False) # -------------------- 生成完整 HTML 页面 -------------------- html_content = f""" Data Analysis Report

{ cdq_title }

1. 预测功率与实际功率曲线对比

{''.join(power_htmls)}

2. 准确率对比

公式
{acc_html}

3. 准确率汇总对比

{summary_html}
""" filename = f"{farmId}_{int(time.time() * 1000)}_{random.randint(1000, 9999)}.html" # 保存为 HTML directory = '/usr/share/nginx/html' if not os.path.exists(directory): os.makedirs(directory) file_path = os.path.join(directory, filename) path = f"http://ds3:10010/{filename}" # 将 HTML 内容写入文件 with open(file_path, "w", encoding="utf-8") as f: f.write(html_content) print("HTML report generated successfully!") return path @app.route('/analysis_report_cdq', methods=['POST']) def analysis_report(): start_time = time.time() result = {} success = 0 path = "" print("Program starts execution!") try: args = request.values.to_dict() print('args', args) logger.info(args) # 获取数据 df_predict, df_accuracy = get_df_list_from_mongo(args)[0], get_df_list_from_mongo(args)[1] path = put_analysis_report_to_html(args, df_predict, df_accuracy) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n", "\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) result['file_path'] = path print("Program execution ends!") return result if __name__ == "__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("analysis_report log") from waitress import serve serve(app, host="0.0.0.0", port=10108) print("server start!") # args_dict = {"mongodb_database": 'db_cdq', 'mongodb_read_table': 'j00234_neu_overwrite,j00234_neu_res', 'col_time': 'dateTime', # 'label': 'C_REAL_VALUE', 'label_pre': 'power_forecast', 'farmId': 'j00234'} # df_predict, df_accuracy = get_df_list_from_mongo(args_dict)[0], get_df_list_from_mongo(args_dict)[1] # path = put_analysis_report_to_html(args_dict, df_predict, df_accuracy)