#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/3/17 10:10 # file: main.py # author: David # company: shenyang JY import pandas as pd import numpy as np from sklearn.model_selection import train_test_split import yaml class DataSet(object): def __init__(self, opt): self.std = None self.mean = None self.opt = opt self.time_step = self.opt.Model["time_step"] excel_data_path = opt.excel_data_path data_format = opt.data_format dq_path = excel_data_path + data_format["dq"] rp_path = excel_data_path + data_format["rp"] envir_path = excel_data_path + data_format["envir"] nwp_path = excel_data_path + data_format["nwp"] dq_columns = [1, 2] rp_columns = [0, 2] envir_columns = [0, *[x for x in range(3, 16)]] nwp_columns = [x for x in range(1, 27)] dq = self.read_data(dq_path, dq_columns) rp = self.read_data(rp_path, rp_columns) # nwp = self.read_data(nwp_path, nwp_columns) # rp_average(rp) # 计算平均功率 envir = self.read_data(envir_path, envir_columns) self.tables, self.tables_column_name = self.tables_integra(dq, rp, envir) # 如果是光 if opt.is_photovoltaic: # self.tables = self.filter_data() pass self.data_num = self.tables.shape[0] self.train_num = int(self.data_num * opt.train_data_rate) # 都是在ndarray量纲下进行计算 self.norm_data = (self.tables[:, 1:] - self.mean) / self.std # 归一化,去量纲 # self.norm_data.insert(0, 'C_TIME', self.tables['C_TIME']) # self.set_yml({'mean': self.mean.to_dict(), 'std': self.std.to_dict()}) self.start_num_in_test = 0 def set_yml(self, yml_dict): with open(self.opt.config_yaml, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) for k, v in yml_dict.items(): cfg[k] = v with open(self.opt.config_yaml, 'w') as f: yaml.safe_dump(cfg, f, default_flow_style=False) def read_data(self, path, cols): init_data = pd.read_excel(path, usecols=cols) return init_data def filter_data(self): check_table = self.tables[:, 2] # 实际功率不能为0,为0代表没发电 preserve_index = list(np.nonzero(check_table)[0]) indexs = list(range(len(self.tables))) del_index = list(set(indexs) - set(preserve_index)) self.tables = np.delete(self.tables, del_index, axis=0) return self.tables def norm(self, tables): """ 归一化操作,获取后存储于config.yml :param tables: :return: """ mean = np.mean(tables.iloc[:, 1:], axis=0) # 数据的均值 std = np.std(tables.iloc[:, 1:], axis=0) # 标准差 if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False: self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()}) self.mean, self.std = mean.values, std.values def tables_integra(self, dq, rp, envir): """ 联合表 :param dq: 短期预测功率 :param rp: 实际功率 :param envir: 环境 :return: 联合表, 列集(不包含第一列时间) """ # 1. 先将 dq rp envir 根据时间联立 union_tables = pd.merge(dq, rp, on='C_TIME') union_tables = union_tables.merge(envir, on='C_TIME') self.norm(union_tables) return union_tables.values, union_tables.columns.tolist()[1:] def get_train_and_valid_data(self, case): feature_data = self.norm_data[:self.train_num] # label_data = self.norm_data[: self.train_num, # self.opt.label_in_feature_index] # 将延后几天的数据作为label label_data = self.norm_data[self.opt.predict_points: self.opt.predict_points + self.train_num, self.opt.label_in_feature_index] time_step = self.opt.Model["time_step"] train_x, train_y = [], [] if not self.opt.do_continue_train: # 在非连续训练模式下,每time_step行数据会作为一个样本,两个样本错开一行,比如:1-20行,2-21行。。。。 if case == 1: # 相当于实际功率+气象 train_x = [feature_data[i:i + time_step] for i in range(self.train_num - time_step)] train_y = [label_data[i:i + time_step] for i in range(self.train_num - time_step)] elif case == 2: # 相当于短期+实际功率+气象 train_rp = [feature_data[i:i + time_step, 1:]for i in range(self.train_num - time_step*2)] train_qd = [feature_data[i + time_step: i + 2*time_step, 0][:, np.newaxis] for i in range(self.train_num - time_step*2)] train_x = [list(np.append(t[0], t[1], axis=1)) for t in zip(train_rp, train_qd)] train_y = [label_data[i:i + time_step] for i in range(self.train_num - time_step*2)] else: # 在连续训练模式下 pass train_x, train_y = np.array(train_x), np.array(train_y) train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=self.opt.valid_data_rate, random_state=self.opt.Model["random_seed"], shuffle=self.opt.shuffle_train_data) # 划分训练和验证集,并打乱 return train_x, valid_x, train_y, valid_y def get_test_data(self, return_label_data=False): feature_data = self.norm_data[self.train_num:] sample_interval = min(feature_data.shape[0], self.time_step*2) # 防止time_step大于测试集数量 assert sample_interval == self.time_step*2 test_x, test_y, dq_y = [], [], [] if self.opt.is_continuous_predict: test_num = len(feature_data) test_x = [feature_data[ i : i + self.time_step] for i in range(test_num - sample_interval)] test_y = [feature_data[ i + self.time_step: i + sample_interval, self.opt.label_in_feature_index] for i in range(test_num - sample_interval)] else: # 在测试数据中,每time_step行数据会作为一个样本,两个样本错开time_step行 # 比如:1-20行,21-40行。。。到数据末尾。 # 这个地方要重新获取测试集 刘大为 self.start_num_in_test = feature_data.shape[0] % sample_interval # 这些天的数据不够一个sample_interval time_step_size = feature_data.shape[0] // sample_interval test_x = [feature_data[ self.start_num_in_test + i * sample_interval: self.start_num_in_test + i * sample_interval + self.time_step] for i in range(time_step_size)] test_y = [feature_data[ self.start_num_in_test + i * sample_interval + self.time_step: self.start_num_in_test + ( i + 1) * sample_interval, self.opt.label_in_feature_index] for i in range(time_step_size)] dq_y = [feature_data[ self.start_num_in_test + i * sample_interval + self.time_step: self.start_num_in_test + ( i + 1) * sample_interval, 0][:, np.newaxis] for i in range(time_step_size)] # test_x = [list(np.append(t[0], t[1], axis=1)) for t in zip(test_x, dq_y)] print("test_x的长度为:", len(test_x)) pass # 把test_x重新转换成timestamp时间步长 # for i, x in enumerate(test_x): # p1 = x[0:16, 0] # p2 = x[16:32, 1] # p = [list(t) for t in zip(p1, p2)] # test_x[i] = np.array(p) if return_label_data: # 实际应用中的测试集是没有label数据的 return np.array(test_x), np.array(test_y), np.array(dq_y) return np.array(test_x) if __name__ == "__main__": ds = DataSet() # dq = ds.read_data(dq_path, dq_columns)[0] # rp = ds.read_data(rp_path, rp_columns)[0] # # rp_average(rp) # 计算平均功率 # envir = ds.read_data(envir_path, envir_columns)[0] # tables = ds.tables_integra(dq, rp, envir) # ds.tables_norm_result(tables)