#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/4/12 17:42 # file: data_features.py # author: David # company: shenyang JY import pandas as pd from sklearn.model_selection import train_test_split import numpy as np from data_utils import * class data_features(object): def __init__(self, opt, mean, std): self.opt = opt self.time_step = self.opt.Model["time_step"] self.mean = mean self.std = std self.columns_lstm = list() self.labels = list() def get_train_data(self, dfs): train_x, valid_x, train_y, valid_y = [], [], [], [] for i, df in enumerate(dfs): if len(df) <= self.opt.Model["time_step"]: continue trainx_, trainy = self.get_data_features(df) trainx_ = [np.array(x) for x in trainx_] trainy_ = [y.iloc[:, 1:].values for y in trainy] tx, vx, ty, vy = train_test_split(trainx_, trainy_, test_size=self.opt.valid_data_rate, random_state=self.opt.Model["random_seed"], shuffle=self.opt.shuffle_train_data) # 划分训练和验证集 # 分裂 tx 和 vx train_x.extend(tx) valid_x.extend(vx) train_y.extend(ty) valid_y.extend(vy) # train_y = np.concatenate(train_y, axis=0) # valid_y = np.concatenate(valid_y, axis=0) train_x = self.norm_features(train_x) valid_x = self.norm_features(valid_x) train_y = self.norm_label(train_y) valid_y = self.norm_label(valid_y) print("训练的数据集有{}个点".format(len(train_x))) return np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y) def get_test_data(self, dfs): test_x, test_y, data_y = [], [], [] for df in dfs: if len(df) <= self.opt.Model["time_step"]: continue testx_, testy = self.get_data_features(df) testx_ = [np.array(x) for x in testx_] testy_ = [y.iloc[:, 1:].values for y in testy] test_x.extend(testx_) test_y.extend(testy_) data_y.extend(testy) test_y = np.concatenate(test_y, axis=0) test_x = self.norm_features(test_x) test_y = self.norm_label(test_y) print("测试的数据集有{}个点".format(len(test_x))) return np.array(test_x), test_y, data_y def get_data_features(self, feature_data): # 这段代码基于pandas方法的优化 time_step = self.opt.Model["time_step"] time_step_loc = time_step - 1 train_num = int(len(feature_data)) features_x = [feature_data.loc[i:i + time_step_loc, 'C_T':'C_WS170'] for i in range(train_num - time_step)] features_y = [feature_data.loc[i:i + time_step_loc, ['C_TIME', 'C_ACTIVE_POWER1', 'C_ACTIVE_POWER2', 'SUM', 'C_REAL_VALUE']] for i in range(train_num - time_step)] self.columns_lstm = features_x[0].columns.tolist() self.labels = features_y[0].columns.tolist() self.labels.remove('C_TIME') self.opt.input_size_lstm = len(self.columns_lstm) # self.opt.input_size_lstm = len(self.columns_lstm) return features_x, features_y def norm_features(self, data: np.ndarray): for i, d in enumerate(data): mean = np.array([self.mean[col] for col in self.columns_lstm]) std = np.array([self.std[col] for col in self.columns_lstm]) d = (d - mean) / std # 归一化 data[i] = d return data def norm_label(self, label_data: np.ndarray): for i, d in enumerate(label_data): mean = np.array([self.mean[col] for col in self.labels]) std = np.array([self.std[col] for col in self.labels]) d = (d - mean) / std # 归一化 label_data[i] = d return label_data