#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/4/12 17:42 # file: data_features.py # author: David # company: shenyang JY import pandas as pd from sklearn.model_selection import train_test_split import numpy as np from data_utils import * class data_features(object): def __init__(self, opt, mean, std): self.opt = opt self.time_step = self.opt.Model["time_step"] self.mean = mean self.std = std self.columns = list() def get_train_data(self, dfs): train_x, valid_x, train_y, valid_y = [], [], [], [] self.opt.feature_columns = dfs[0].columns.tolist() self.opt.feature_columns.insert(0, 'C_TIME') self.opt.label_in_feature_index = (lambda x, y: [x.index(i) for i in y])(self.opt.feature_columns, self.opt.label_columns) # 因为feature不一定从0开始 self.opt.input_size = len(self.opt.feature_columns) for df in dfs: datax, datay = self.get_data_features(df) trainx = np.array(datax) trainy = [y['C_REAL_VALUE'].values for y in datay] trainy = np.expand_dims(np.array(trainy), axis=-1) # 在最后一维加一维度 tx, vx, ty, vy = train_test_split(trainx, trainy, test_size=self.opt.valid_data_rate, random_state=self.opt.Model["random_seed"], shuffle=self.opt.shuffle_train_data) # 划分训练和验证集 train_x.append(tx) valid_x.append(vx) train_y.append(ty) valid_y.append(vy) train_x = np.concatenate(train_x, axis=0) valid_x = np.concatenate(valid_x, axis=0) train_y = np.concatenate(train_y, axis=0) valid_y = np.concatenate(valid_y, axis=0) train_x = self.norm_features(train_x) valid_x = self.norm_features(valid_x) train_y = self.norm_label(train_y) valid_y = self.norm_label(valid_y) return train_x, valid_x, train_y, valid_y def get_test_data(self, dfs): test_x, test_y, data_y = [], [], [] self.opt.feature_columns = dfs[0].columns.tolist() self.opt.feature_columns.insert(0, 'C_TIME') self.opt.label_in_feature_index = (lambda x, y: [x.index(i) for i in y])(self.opt.feature_columns, self.opt.label_columns) # 因为feature不一定从0开始 self.opt.input_size = len(self.opt.feature_columns) for df in dfs: datax, datay = self.get_data_features(df) trainx = np.array(datax) trainy = [y['C_REAL_VALUE'].values for y in datay] trainy = np.expand_dims(np.array(trainy), axis=-1) # 在最后一维加一维度 test_x.append(trainx) test_y.append(trainy) data_y.append(datay) test_x = np.concatenate(test_x, axis=0) test_y = np.concatenate(test_y, axis=0) test_x = self.norm_features(test_x) test_y = self.norm_label(test_y) return test_x, test_y, data_y def get_data_features(self, df): # 这段代码基于pandas方法的优化 norm_data = df.reset_index() feature_data = norm_data[:-self.opt.predict_points] label_data = norm_data[self.opt.predict_points:].reset_index(drop=True) time_step = self.opt.Model["time_step"] time_step_loc = time_step - 1 train_num = int(len(feature_data)) time_rp = [feature_data.loc[i:i + time_step_loc, 'C_TIME':'C_WD_INST120'] for i in range(train_num - time_step)] nwp = [label_data.loc[i:i + time_step_loc, 'C_T':] for i in range(train_num - time_step)] features_x, features_y = [], [] for row in zip(time_rp, nwp): row0 = row[0] row1 = row[1] row0 = pd.concat([row0, row1], axis=1) row0.set_index('C_TIME', inplace=True, drop=False) row0["C_TIME"] = row0["C_TIME"].apply(datetime_to_timestr) features_x.append(row0) self.columns = row0.columns.tolist() features_y = [label_data.loc[i:i + time_step_loc, ['C_TIME', 'C_REAL_VALUE']] for i in range(train_num - time_step)] return features_x, features_y def norm_features(self, data: np.ndarray): mean = np.array([self.mean[col] for col in self.columns]) std = np.array([self.std[col] for col in self.columns]) data = (data - mean) / std # 归一化 return data def norm_label(self, label_data: np.ndarray): return (label_data - self.mean['C_REAL_VALUE']) / self.std['C_REAL_VALUE']