123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/4/12 17:42
- # file: data_features.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- from sklearn.model_selection import train_test_split
- import numpy as np
- from data_utils import *
- class data_features(object):
- def __init__(self, opt, mean, std):
- self.opt = opt
- self.time_step = self.opt.Model["time_step"]
- self.mean = mean
- self.std = std
- self.columns = list()
- def get_train_data(self, dfs):
- train_x, valid_x, train_y, valid_y = [], [], [], []
- for df in dfs:
- datax, datay = self.get_data_features(df)
- trainx = np.array(datax)
- trainy = [y['C_REAL_VALUE'].values for y in datay]
- trainy = np.expand_dims(np.array(trainy), axis=-1) # 在最后一维加一维度
- tx, vx, ty, vy = train_test_split(trainx, trainy, test_size=self.opt.valid_data_rate,
- random_state=self.opt.Model["random_seed"],
- shuffle=self.opt.shuffle_train_data) # 划分训练和验证集
- train_x.append(tx)
- valid_x.append(vx)
- train_y.append(ty)
- valid_y.append(vy)
- train_x = np.concatenate(train_x, axis=0)
- valid_x = np.concatenate(valid_x, axis=0)
- train_y = np.concatenate(train_y, axis=0)
- valid_y = np.concatenate(valid_y, axis=0)
- train_x = self.norm_features(train_x)
- valid_x = self.norm_features(valid_x)
- train_y = self.norm_label(train_y)
- valid_y = self.norm_label(valid_y)
- return train_x, valid_x, train_y, valid_y
- def get_test_data(self, dfs):
- test_x, test_y, data_y = [], [], []
- for df in dfs:
- datax, datay = self.get_data_features(df)
- trainx = np.array(datax)
- trainy = [y['C_REAL_VALUE'].values for y in datay]
- trainy = np.expand_dims(np.array(trainy), axis=-1) # 在最后一维加一维度
- test_x.append(trainx)
- test_y.append(trainy)
- data_y.append(datay)
- test_x = np.concatenate(test_x, axis=0)
- test_y = np.concatenate(test_y, axis=0)
- test_x = self.norm_features(test_x)
- test_y = self.norm_label(test_y)
- return test_x, test_y, data_y
- def get_data_features(self, df):
- norm_data = df.reset_index()
- feature_data = norm_data[:-self.opt.predict_points]
- label_data = norm_data[self.opt.predict_points:].reset_index(drop=True)
- time_step = self.opt.Model["time_step"]
- time_step_loc = time_step - 1
- train_num = int(len(feature_data))
- time_rp = [feature_data.loc[i:i + time_step_loc, ['C_TIME', 'C_REAL_VALUE']] for i in range(train_num - time_step)]
- dq = [label_data.loc[i:i + time_step_loc, 'C_FP_VALUE'] for i in range(train_num - time_step)]
- features_x, features_y = [], []
- for row in zip(time_rp, dq):
- row0 = row[0]
- row1 = row[1]
- row0['C_FP_VALUE'] = row1
- row0.set_index('C_TIME', inplace=True, drop=False)
- row0["C_TIME"] = row0["C_TIME"].apply(datetime_to_timestr)
- features_x.append(row0)
- self.columns = row0.columns.tolist()
- features_y = [label_data.loc[i:i + time_step_loc, ['C_TIME', 'C_REAL_VALUE', 'C_FP_VALUE']] for i in range(train_num - time_step)]
- return features_x, features_y
- def norm_features(self, data: np.ndarray):
- mean = np.array([self.mean[col] for col in self.columns])
- std = np.array([self.std[col] for col in self.columns])
- data = (data - mean) / std # 归一化
- return data
- def norm_label(self, label_data: np.ndarray):
- return (label_data - self.mean['C_REAL_VALUE']) / self.std['C_REAL_VALUE']
|