123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/3/17 10:10
- # file: main.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- import numpy as np
- from sklearn.model_selection import train_test_split
- import yaml
- class DataSet(object):
- def __init__(self, opt):
- self.std = None
- self.mean = None
- self.opt = opt
- self.time_step = self.opt.Model["time_step"]
- excel_data_path = opt.excel_data_path
- data_format = opt.data_format
- dq_path = excel_data_path + data_format["dq"]
- rp_path = excel_data_path + data_format["rp"]
- envir_path = excel_data_path + data_format["envir"]
- nwp_path = excel_data_path + data_format["nwp"]
- dq_columns = [1, 2]
- rp_columns = [0, 2]
- envir_columns = [0, *[x for x in range(3, 16)]]
- nwp_columns = [x for x in range(1, 27)]
- dq = self.read_data(dq_path, dq_columns)
- rp = self.read_data(rp_path, rp_columns)
- # nwp = self.read_data(nwp_path, nwp_columns)
- # rp_average(rp) # 计算平均功率
- envir = self.read_data(envir_path, envir_columns)
- self.tables, self.tables_column_name = self.tables_integra(dq, rp, envir)
- # 如果是光
- if opt.is_photovoltaic:
- # self.tables = self.filter_data()
- pass
- self.data_num = self.tables.shape[0]
- self.train_num = int(self.data_num * opt.train_data_rate)
- # 都是在ndarray量纲下进行计算
- self.norm_data = (self.tables[:, 1:] - self.mean) / self.std # 归一化,去量纲
- # self.norm_data.insert(0, 'C_TIME', self.tables['C_TIME'])
- # self.set_yml({'mean': self.mean.to_dict(), 'std': self.std.to_dict()})
- self.start_num_in_test = 0
- def set_yml(self, yml_dict):
- with open(self.opt.config_yaml, 'r', encoding='utf-8') as f:
- cfg = yaml.safe_load(f)
- for k, v in yml_dict.items():
- cfg[k] = v
- with open(self.opt.config_yaml, 'w') as f:
- yaml.safe_dump(cfg, f, default_flow_style=False)
- def read_data(self, path, cols):
- init_data = pd.read_excel(path, usecols=cols)
- return init_data
- def filter_data(self):
- check_table = self.tables[:, 2] # 实际功率不能为0,为0代表没发电
- preserve_index = list(np.nonzero(check_table)[0])
- indexs = list(range(len(self.tables)))
- del_index = list(set(indexs) - set(preserve_index))
- self.tables = np.delete(self.tables, del_index, axis=0)
- return self.tables
- def norm(self, tables):
- """
- 归一化操作,获取后存储于config.yml
- :param tables:
- :return:
- """
- mean = np.mean(tables.iloc[:, 1:], axis=0) # 数据的均值
- std = np.std(tables.iloc[:, 1:], axis=0) # 标准差
- if hasattr(self.opt, 'mean') is False or hasattr(self.opt, 'std') is False:
- self.set_yml({'mean': mean.to_dict(), 'std': std.to_dict()})
- self.mean, self.std = mean.values, std.values
- def tables_integra(self, dq, rp, envir):
- """
- 联合表
- :param dq: 短期预测功率
- :param rp: 实际功率
- :param envir: 环境
- :return: 联合表, 列集(不包含第一列时间)
- """
- # 1. 先将 dq rp envir 根据时间联立
- union_tables = pd.merge(dq, rp, on='C_TIME')
- union_tables = union_tables.merge(envir, on='C_TIME')
- self.norm(union_tables)
- return union_tables.values, union_tables.columns.tolist()[1:]
-
- def get_train_and_valid_data(self, case):
- feature_data = self.norm_data[:self.train_num]
- # label_data = self.norm_data[: self.train_num,
- # self.opt.label_in_feature_index] # 将延后几天的数据作为label
- label_data = self.norm_data[self.opt.predict_points: self.opt.predict_points + self.train_num, self.opt.label_in_feature_index]
- time_step = self.opt.Model["time_step"]
- train_x, train_y = [], []
- if not self.opt.do_continue_train:
- # 在非连续训练模式下,每time_step行数据会作为一个样本,两个样本错开一行,比如:1-20行,2-21行。。。。
- if case == 1: # 相当于实际功率+气象
- train_x = [feature_data[i:i + time_step] for i in range(self.train_num - time_step)]
- train_y = [label_data[i:i + time_step] for i in range(self.train_num - time_step)]
- elif case == 2: # 相当于短期+实际功率+气象
- train_rp = [feature_data[i:i + time_step, 1:]for i in range(self.train_num - time_step*2)]
- train_qd = [feature_data[i + time_step: i + 2*time_step, 0][:, np.newaxis] for i in range(self.train_num - time_step*2)]
- train_x = [list(np.append(t[0], t[1], axis=1)) for t in zip(train_rp, train_qd)]
- train_y = [label_data[i:i + time_step] for i in range(self.train_num - time_step*2)]
- else:
- # 在连续训练模式下
- pass
- train_x, train_y = np.array(train_x), np.array(train_y)
- train_x, valid_x, train_y, valid_y = train_test_split(train_x, train_y, test_size=self.opt.valid_data_rate,
- random_state=self.opt.Model["random_seed"],
- shuffle=self.opt.shuffle_train_data) # 划分训练和验证集,并打乱
- return train_x, valid_x, train_y, valid_y
- def get_test_data(self, return_label_data=False):
- feature_data = self.norm_data[self.train_num:]
- sample_interval = min(feature_data.shape[0], self.time_step*2) # 防止time_step大于测试集数量
- assert sample_interval == self.time_step*2
- test_x, test_y, dq_y = [], [], []
- if self.opt.is_continuous_predict:
- test_num = len(feature_data)
- test_x = [feature_data[
- i : i + self.time_step]
- for i in range(test_num - sample_interval)]
- test_y = [feature_data[
- i + self.time_step: i + sample_interval, self.opt.label_in_feature_index]
- for i in range(test_num - sample_interval)]
- else:
- # 在测试数据中,每time_step行数据会作为一个样本,两个样本错开time_step行
- # 比如:1-20行,21-40行。。。到数据末尾。
- # 这个地方要重新获取测试集 刘大为
- self.start_num_in_test = feature_data.shape[0] % sample_interval # 这些天的数据不够一个sample_interval
- time_step_size = feature_data.shape[0] // sample_interval
- test_x = [feature_data[
- self.start_num_in_test + i * sample_interval: self.start_num_in_test + i * sample_interval + self.time_step]
- for i in range(time_step_size)]
- test_y = [feature_data[
- self.start_num_in_test + i * sample_interval + self.time_step: self.start_num_in_test + (
- i + 1) * sample_interval, self.opt.label_in_feature_index]
- for i in range(time_step_size)]
- dq_y = [feature_data[
- self.start_num_in_test + i * sample_interval + self.time_step: self.start_num_in_test + (
- i + 1) * sample_interval, 0][:, np.newaxis]
- for i in range(time_step_size)]
- # test_x = [list(np.append(t[0], t[1], axis=1)) for t in zip(test_x, dq_y)]
- print("test_x的长度为:", len(test_x))
- pass
- # 把test_x重新转换成timestamp时间步长
- # for i, x in enumerate(test_x):
- # p1 = x[0:16, 0]
- # p2 = x[16:32, 1]
- # p = [list(t) for t in zip(p1, p2)]
- # test_x[i] = np.array(p)
- if return_label_data: # 实际应用中的测试集是没有label数据的
- return np.array(test_x), np.array(test_y), np.array(dq_y)
- return np.array(test_x)
- if __name__ == "__main__":
- ds = DataSet()
- # dq = ds.read_data(dq_path, dq_columns)[0]
- # rp = ds.read_data(rp_path, rp_columns)[0]
- # # rp_average(rp) # 计算平均功率
- # envir = ds.read_data(envir_path, envir_columns)[0]
- # tables = ds.tables_integra(dq, rp, envir)
- # ds.tables_norm_result(tables)
|