#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/4/12 18:57 # file: data_analyse.py # author: David # company: shenyang JY import sys import numpy as np import matplotlib.pyplot as plt import pandas as pd from data_utils import * class data_analyse(object): def __init__(self, opt, logger, process): self.opt = opt self.logger = logger self.ds = process def formula_acc(self): excel_data_path = self.opt.excel_data_path data_format = self.opt.data_format formula_path = excel_data_path + data_format["formula"] formula = pd.read_csv(formula_path, usecols=['C_ABLE_VALUE', 'C_FORECAST_HOW_LONG_AGO', 'C_FORECAST_TIME']) formula["C_FORECAST_TIME"] = formula["C_FORECAST_TIME"].apply(timestr_to_datetime) formula = formula.rename(columns={"C_FORECAST_TIME": "C_TIME"}) formula = formula.loc[formula['C_FORECAST_HOW_LONG_AGO'] == 16] return formula def calculate_acc(self, label_data, predict_data): loss = np.sum((label_data - predict_data) ** 2) / len(label_data) # mse loss_sqrt = np.sqrt(loss) # rmse loss_acc = 1 - loss_sqrt / self.opt.cap return loss_acc def get_16_points(self, results): # results为模型预测的一维数组,遍历,取每16个点的最后一个点 preds = [] for res in results: preds.append(res.iloc[-1].values) return np.array(preds) def predict_acc(self, predict_data, dfy): predict_data = predict_data * self.ds.std['C_REAL_VALUE'] + self.ds.mean['C_REAL_VALUE'] dfs = dfy[0] for i in range(1, len(dfy)): dfs.extend(dfy[i]) for i, df in enumerate(dfs): df["PREDICT"] = predict_data[i] dfs[i] = df data = self.get_16_points(dfs) df = pd.DataFrame(data, columns=['C_TIME', 'C_REAL_VALUE', 'C_FP_VALUE', 'PREDICT']) # label_data = label_data.reshape((-1, self.opt.output_size)) # label_data 要进行反归一化 df.to_csv(self.opt.excel_data_path + "dq+rp.csv") formula = self.formula_acc() df = pd.merge(df, formula, on='C_TIME') label_name = [self.opt.feature_columns[i] for i in self.opt.label_in_feature_index] loss_norm = self.calculate_acc(df['C_REAL_VALUE'], df['PREDICT']) self.logger.info("The mean squared error of power {} is ".format(label_name) + str(loss_norm)) loss_norm = self.calculate_acc(df['C_REAL_VALUE'], df['C_FP_VALUE']) self.logger.info("The mean squared error of power {} is ".format(label_name) + str(loss_norm)) loss_norm = self.calculate_acc(df['C_REAL_VALUE'], df['C_ABLE_VALUE']) self.logger.info("The mean squared error of power {} is ".format(label_name) + str(loss_norm)) self.preidct_draw(df['C_REAL_VALUE'].values, df['PREDICT'].values) def preidct_draw(self, label_data, predict_data): X = list(range(label_data.shape[0])) print("label_x = ", X) label_column_num = len(self.opt.label_columns) label_name = [self.opt.feature_columns[i] for i in self.opt.label_in_feature_index] if not sys.platform.startswith('linux'): # 无桌面的Linux下无法输出,如果是有桌面的Linux,如Ubuntu,可去掉这一行 for i in range(label_column_num): plt.figure(i+1) # 预测数据绘制 plt.plot(X, label_data, label='label', color='b') plt.plot(X, predict_data, label='predict', color='g') # plt.plot(predict_X, dq_data[:, i], label='dq', color='y') # plt.title("Predict actual {} power with {}".format(label_name[i], self.opt.used_frame)) # self.logger.info("The predicted power {} for the last {} point(s) is: ".format(label_name[i], self.opt.predict_points) + # str(np.squeeze(predict_data[-self.opt.predict_points:, i]))) if self.opt.do_figure_save: plt.savefig(self.opt.figure_save_path+"{}predict_{}_with_{}.png".format(self.opt.continue_flag, label_name[i], self.opt.used_frame)) plt.show() def tangle_results(self): pass