123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127 |
- import pandas as pd
- import numpy as np
- from torch import nn
- from torch.utils.data import DataLoader
- from dataset.TimeDataset import TimeSeriesDataset
- from training import TimeSeriesTransformer
- import torch
- import matplotlib.pyplot as plt
- from utils.Arg import Arg
- arg = Arg()
- input_dim = arg.input_dim
- output_dim = arg.output_dim
- input_seq_length = arg.input_seq_length
- output_seq_length = arg.output_seq_length
- d_model = arg.d_model
- nhead = arg.nhead
- num_layers = arg.num_layers
- dropout = arg.dropout
- batch_size = arg.batch_size
- epochs = arg.epochs
- time_split = arg.time_split
- def rmse(predictions, targets):
- return np.sqrt(((predictions - targets) ** 2).mean())
- def calculate_rmse_for_intervals(list1, list2, step=96):
- assert len(list1) == len(list2), "两个列表的长度必须相等"
- rmse_results = []
- for i in range(0, len(list1), step):
- predictions = np.array(list1[i:i + step])
- targets = np.array(list2[i:i + step])
- rmse_val = rmse(predictions, targets)
- rmse_results.append(rmse_val)
- return rmse_results
- cap = arg.power_max
- def calculate_accuracy(output_values, target_values,powermax,powermin):
- output_values_i = output_values[0:len(output_values)]
- target_values_i = target_values[0:len(target_values)]
- for i in range(len(output_values)):
- output_values_i[i] = output_values_i[i] * (powermax - powermin) + powermin
- if output_values[i] < 0: output_values[i] = 0
- target_values_i[i] = target_values_i[i] * (powermax - powermin) + powermin
- cal = calculate_rmse_for_intervals(output_values_i,target_values_i)
- totalacc = 0
- acc_list = []
- for i in range(len(cal)):
- accuracy = (1-cal[i]/cap)*100
- #print("第{}天的准确率为{}".format(i,accuracy))
- acc_list.extend([accuracy])
- totalacc = totalacc+accuracy
- return acc_list,totalacc/len(cal)
- def test(test_loader,C_TIME,powermax,powermin,name):
- test_loss = 0.0
- criterion = nn.MSELoss()
- with torch.no_grad():
- #model = TimeSeriesTransformer(input_dim, output_dim, d_model, nhead, num_layers, dropout)
- model = TimeSeriesTransformer()
- model.load_state_dict(torch.load(f'./save/{name}'))
- output_values = []
- target_values = []
- datetime_values = C_TIME
- for i, (inputs_3, target) in enumerate(test_loader):
- #inputs_3 = inputs_3.permute(1, 0, 2)
- #target = target.unsqueeze(0)
- #tgt = inputs_3[-1].unsqueeze(0)
- #output = model(inputs_3, tgt)
- #print(output.shape)
- #inputs_3 = inputs_3.permute(1, 0, 2)
- #tgt = inputs_3[-1:]
- output = model(inputs_3)#,tgt)
- test_loss += criterion(output, target).item()
- test_loss /= len(test_loader)
- output_values.extend(output.squeeze().tolist())
- target_values.extend(target.squeeze().tolist())
- acclist,acc = calculate_accuracy(output_values, target_values,powermax,powermin)
- print('Test Loss: {:.4f},Test accuracy:{:.4f}'.format(test_loss,acc))
- print(np.min(output_values))
- print(np.max(output_values))
- for i in range(len(output_values)):
- output_values[i] = output_values[i] * (powermax - powermin) + powermin
- if output_values[i] < 0: output_values[i] = 0
- target_values[i] = target_values[i] * (powermax - powermin) + powermin
- plt.plot(output_values, label='model Output')
- plt.plot(target_values, label='Actual Value')
- for i in range(0, len(output_values) // 96 + 1):
- plt.axvline(x=96 * i, linestyle='--', color='gray')
- plt.annotate(round(acclist[i],2), xy=(96 * i+48, plt.ylim()[1]-5), xytext=(96 * i, plt.ylim()[1]-5), va='top', ha='left',fontsize=8)
- plt.axvline(x=len(output_values), linestyle='--', color='gray')
- plt.xlabel('Sample Index')
- plt.ylabel('Value')
- plt.legend()
- plt.show()
- df1 = pd.DataFrame(output_values)
- df2 = pd.DataFrame(target_values)
- df3 = pd.DataFrame(datetime_values)
- df = pd.concat([df1, df2], axis=1)
- df = pd.concat([df3, df], axis=1)
- df.columns=["时间","预测功率","实际功率"]
- df.to_excel('./save/短期对比.xlsx', index=False, sheet_name='对比数据')
- return test_loss,acc
- def data_prase(NWP_test,power_test):
- data_NWP = pd.read_csv(NWP_test)
- data_power = pd.read_csv(power_test).iloc[:,1]
- C_TIME = data_NWP.iloc[:,0]
- dataset = TimeSeriesDataset(power_test, NWP_test)
- test_loader = DataLoader(dataset, batch_size=arg.batch_size)
- powermax = arg.power_max
- powermin = arg.power_min
- return test_loader,C_TIME,powermax,powermin
- def test_model(year,month,name):
- NWP_test = f'./data/training/NWP/NWP_{year}_{month}.csv'
- power_test = f'./data/training/power/power_{year}_{month}.csv'
- test_loader, C_TIME, powermax, powermin = data_prase(NWP_test,power_test)
- test_loss,acc = test(test_loader, C_TIME, powermax, powermin,name)
- return test_loss,acc
|