import pandas as pd import numpy as np from torch import nn from torch.utils.data import DataLoader from dataset.TimeDataset import TimeSeriesDataset from training import TimeSeriesTransformer import torch import matplotlib.pyplot as plt from utils.Arg import Arg arg = Arg() input_dim = arg.input_dim output_dim = arg.output_dim input_seq_length = arg.input_seq_length output_seq_length = arg.output_seq_length d_model = arg.d_model nhead = arg.nhead num_layers = arg.num_layers dropout = arg.dropout batch_size = arg.batch_size epochs = arg.epochs time_split = arg.time_split def rmse(predictions, targets): return np.sqrt(((predictions - targets) ** 2).mean()) def calculate_rmse_for_intervals(list1, list2, step=96): assert len(list1) == len(list2), "两个列表的长度必须相等" rmse_results = [] for i in range(0, len(list1), step): predictions = np.array(list1[i:i + step]) targets = np.array(list2[i:i + step]) rmse_val = rmse(predictions, targets) rmse_results.append(rmse_val) return rmse_results cap = arg.power_max def calculate_accuracy(output_values, target_values,powermax,powermin): output_values_i = output_values[0:len(output_values)] target_values_i = target_values[0:len(target_values)] for i in range(len(output_values)): output_values_i[i] = output_values_i[i] * (powermax - powermin) + powermin if output_values[i] < 0: output_values[i] = 0 target_values_i[i] = target_values_i[i] * (powermax - powermin) + powermin cal = calculate_rmse_for_intervals(output_values_i,target_values_i) totalacc = 0 acc_list = [] for i in range(len(cal)): accuracy = (1-cal[i]/cap)*100 #print("第{}天的准确率为{}".format(i,accuracy)) acc_list.extend([accuracy]) totalacc = totalacc+accuracy return acc_list,totalacc/len(cal) def test(test_loader,C_TIME,powermax,powermin,name): test_loss = 0.0 criterion = nn.MSELoss() with torch.no_grad(): #model = TimeSeriesTransformer(input_dim, output_dim, d_model, nhead, num_layers, dropout) model = TimeSeriesTransformer() model.load_state_dict(torch.load(f'./save/{name}')) output_values = [] target_values = [] datetime_values = C_TIME for i, (inputs_3, target) in enumerate(test_loader): #inputs_3 = inputs_3.permute(1, 0, 2) #target = target.unsqueeze(0) #tgt = inputs_3[-1].unsqueeze(0) #output = model(inputs_3, tgt) #print(output.shape) #inputs_3 = inputs_3.permute(1, 0, 2) #tgt = inputs_3[-1:] output = model(inputs_3)#,tgt) test_loss += criterion(output, target).item() test_loss /= len(test_loader) output_values.extend(output.squeeze().tolist()) target_values.extend(target.squeeze().tolist()) acclist,acc = calculate_accuracy(output_values, target_values,powermax,powermin) print('Test Loss: {:.4f},Test accuracy:{:.4f}'.format(test_loss,acc)) print(np.min(output_values)) print(np.max(output_values)) for i in range(len(output_values)): output_values[i] = output_values[i] * (powermax - powermin) + powermin if output_values[i] < 0: output_values[i] = 0 target_values[i] = target_values[i] * (powermax - powermin) + powermin plt.plot(output_values, label='model Output') plt.plot(target_values, label='Actual Value') for i in range(0, len(output_values) // 96 + 1): plt.axvline(x=96 * i, linestyle='--', color='gray') plt.annotate(round(acclist[i],2), xy=(96 * i+48, plt.ylim()[1]-5), xytext=(96 * i, plt.ylim()[1]-5), va='top', ha='left',fontsize=8) plt.axvline(x=len(output_values), linestyle='--', color='gray') plt.xlabel('Sample Index') plt.ylabel('Value') plt.legend() plt.show() df1 = pd.DataFrame(output_values) df2 = pd.DataFrame(target_values) df3 = pd.DataFrame(datetime_values) df = pd.concat([df1, df2], axis=1) df = pd.concat([df3, df], axis=1) df.columns=["时间","预测功率","实际功率"] df.to_excel('./save/短期对比.xlsx', index=False, sheet_name='对比数据') return test_loss,acc def data_prase(NWP_test,power_test): data_NWP = pd.read_csv(NWP_test) data_power = pd.read_csv(power_test).iloc[:,1] C_TIME = data_NWP.iloc[:,0] dataset = TimeSeriesDataset(power_test, NWP_test) test_loader = DataLoader(dataset, batch_size=arg.batch_size) powermax = arg.power_max powermin = arg.power_min return test_loader,C_TIME,powermax,powermin def test_model(year,month,name): NWP_test = f'./data/training/NWP/NWP_{year}_{month}.csv' power_test = f'./data/training/power/power_{year}_{month}.csv' test_loader, C_TIME, powermax, powermin = data_prase(NWP_test,power_test) test_loss,acc = test(test_loader, C_TIME, powermax, powermin,name) return test_loss,acc