12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/4/12 17:42
- # file: data_features.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- from sklearn.model_selection import train_test_split
- import numpy as np
- from data_utils import *
- class data_features(object):
- def __init__(self, opt, mean, std):
- self.opt = opt
- self.time_step = self.opt.Model["time_step"]
- self.mean = mean
- self.std = std
- self.columns_lstm = list()
- self.labels = list()
- def get_train_data(self, dfs):
- train_x, valid_x, train_y, valid_y = [], [], [], []
- for i, df in enumerate(dfs):
- if len(df) <= self.opt.Model["time_step"]:
- continue
- trainx_, trainy = self.get_data_features(df)
- trainx_ = [np.array(x) for x in trainx_]
- trainy_ = [y.iloc[:, 1:].values for y in trainy]
- tx, vx, ty, vy = train_test_split(trainx_, trainy_, test_size=self.opt.valid_data_rate,
- random_state=self.opt.Model["random_seed"],
- shuffle=self.opt.shuffle_train_data) # 划分训练和验证集
- # 分裂 tx 和 vx
- train_x.extend(tx)
- valid_x.extend(vx)
- train_y.extend(ty)
- valid_y.extend(vy)
- # train_y = np.concatenate(train_y, axis=0)
- # valid_y = np.concatenate(valid_y, axis=0)
- train_x = self.norm_features(train_x)
- valid_x = self.norm_features(valid_x)
- train_y = self.norm_label(train_y)
- valid_y = self.norm_label(valid_y)
- print("训练的数据集有{}个点".format(len(train_x)))
- return np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
- def get_test_data(self, dfs):
- test_x, test_y, data_y = [], [], []
- for df in dfs:
- if len(df) <= self.opt.Model["time_step"]:
- continue
- testx_, testy = self.get_data_features(df)
- testx_ = [np.array(x) for x in testx_]
- testy_ = [y.iloc[:, 1:].values for y in testy]
- test_x.extend(testx_)
- test_y.extend(testy_)
- data_y.extend(testy)
- test_y = np.concatenate(test_y, axis=0)
- test_x = self.norm_features(test_x)
- test_y = self.norm_label(test_y)
- print("测试的数据集有{}个点".format(len(test_x)))
- return np.array(test_x), test_y, data_y
- def get_data_features(self, feature_data): # 这段代码基于pandas方法的优化
- time_step = self.opt.Model["time_step"]
- time_step_loc = time_step - 1
- train_num = int(len(feature_data))
- features_x = [feature_data.loc[i:i + time_step_loc, 'C_T':'C_WS170'] for i in range(train_num - time_step)]
- features_y = [feature_data.loc[i:i + time_step_loc, ['C_TIME', 'C_ACTIVE_POWER1', 'C_ACTIVE_POWER2', 'SUM', 'C_REAL_VALUE']] for i in range(train_num - time_step)]
- self.columns_lstm = features_x[0].columns.tolist()
- self.labels = features_y[0].columns.tolist()
- self.labels.remove('C_TIME')
- self.opt.input_size_lstm = len(self.columns_lstm)
- # self.opt.input_size_lstm = len(self.columns_lstm)
- return features_x, features_y
- def norm_features(self, data: np.ndarray):
- for i, d in enumerate(data):
- mean = np.array([self.mean[col] for col in self.columns_lstm])
- std = np.array([self.std[col] for col in self.columns_lstm])
- d = (d - mean) / std # 归一化
- data[i] = d
- return data
- def norm_label(self, label_data: np.ndarray):
- for i, d in enumerate(label_data):
- mean = np.array([self.mean[col] for col in self.labels])
- std = np.array([self.std[col] for col in self.labels])
- d = (d - mean) / std # 归一化
- label_data[i] = d
- return label_data
|