123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/4/12 17:42
- # file: data_features.py
- # author: David
- # company: shenyang JY
- import pandas as pd
- import numpy as np
- np.random.seed(42)
- class DataFeatures(object):
- def __init__(self, log, args):
- self.logger = log
- self.args = args
- self.opt = self.args.parse_args_and_yaml()
- self.columns = list()
- def train_valid_split(self, datax, datay, valid_rate, shuffle):
- shuffle_index = np.random.permutation(len(datax))
- indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
- valid_size = int(len(datax)*valid_rate)
- valid_index = indexs[-valid_size:]
- train_index = indexs[:-valid_size]
- tx, vx, ty, vy = [], [], [], []
- for i, data in enumerate(zip(datax, datay)):
- if i in train_index:
- tx.append(data[0])
- ty.append(data[1])
- elif i in valid_index:
- vx.append(data[0])
- vy.append(data[1])
- return tx, vx, ty, vy
- def get_train_data(self, dfs, envir):
- num = 1
- train_x, valid_x, train_y, valid_y = [], [], [], []
- for i, df in enumerate(dfs, start=1):
- if len(df) < self.opt.Model["time_step"]:
- self.logger.info("特征处理-训练数据-不满足time_step +{}".format(num))
- num += 1
- continue
- datax, datay = self.get_data_features(df, envir, is_train=True)
- if len(datax) < 10:
- self.logger.info("特征处理-训练数据-无法进行最小分割 +{}".format(num))
- num += 1
- continue
- tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data']) # 划分训练和验证集
- train_x.extend(tx)
- valid_x.extend(vx)
- train_y.extend(ty)
- valid_y.extend(vy)
- train_y = [np.array([y[0].values for y in train_y]), np.concatenate([[y[1].iloc[:, 1].values for y in train_y]])]
- valid_y = [np.array([y[0].values for y in valid_y]), np.concatenate([[y[1].iloc[:, 1].values for y in valid_y]])]
- train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
- valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
- return train_x, valid_x, train_y, valid_y
- def get_test_data(self, dfs, envir):
- num = 0
- test_x, test_y, data_y = [], [], []
- for i, df in enumerate(dfs, start=1):
- if len(df) < self.opt.Model["time_step"]:
- self.logger.info("特征处理-测试数据-不满足time_step +{}".format(num))
- num += 1
- continue
- datax, datay = self.get_data_features(df, envir, is_train=False)
- test_x.extend(datax)
- test_y.extend(datay)
- data_y.extend(datay)
- test_x = [np.array([x[0].values for x in test_x]), np.array([x[1].values for x in test_x])]
- # test_y = np.concatenate([[y.iloc[:, 1].values for y in test_y]], axis=0)
- return test_x, data_y
- def get_realtime_data(self, dfs, envir):
- test_x = []
- for i, df in enumerate(dfs, start=1):
- if len(df) < self.opt.Model["time_step"]:
- self.logger.info("特征处理-预测数据-不满足time_step")
- continue
- datax = self.get_realtime_data_features(df, envir)
- test_x.extend(datax)
- test_x = [np.array([x[0].values for x in test_x]), np.array([x[1].values for x in test_x])]
- return test_x
- def get_data_features(self, norm_data, envir, is_train): # 这段代码基于pandas方法的优化
- time_step = self.opt.Model["time_step"]
- feature_data = norm_data.reset_index(drop=True)
- time_step_loc = time_step - 1
- train_num = int(len(feature_data))
- label_features_power = ['C_TIME', 'C_REAL_VALUE'] if is_train is True else ['C_TIME', 'C_REAL_VALUE']
- label_features_zone = list(self.opt.zone.keys())
- nwp_cs = self.opt.nwp_columns.copy()
- nwp_cs = [c for c in nwp_cs if c not in ['C_TIME']+label_features_zone]
- nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step)] # 数据库字段 'C_T': 'C_WS170'
- labels_power = [feature_data.loc[i:i + time_step_loc, label_features_power].reset_index(drop=True) for i in range(train_num - time_step)]
- labels_zone = [feature_data.loc[i:i + time_step_loc, label_features_zone].reset_index(drop=True) for i in range(train_num - time_step)]
- features_x, features_y = [], []
- self.logger.info("匹配环境前,{}组 -> ".format(len(nwp)))
- env_fill = envir[-self.opt.Model["his_points"]:]
- for i, row in enumerate(zip(nwp, labels_power, labels_zone)):
- time_end = row[1]['C_TIME'][0]
- time_start = time_end - pd.DateOffset(1)
- row1 = envir[(envir.C_TIME < time_end) & (envir.C_TIME > time_start)][-self.opt.Model["his_points"]:]
- if len(row1) < self.opt.Model["his_points"]:
- row1 = env_fill
- self.logger.info("环境数据不足{}个点:{}".format(self.opt.Model["his_points"], len(row1)))
- continue
- row1 = row1.reset_index(drop=True).drop(['C_TIME'], axis=1)
- # features_x.append([row1.iloc[:,:-4], row1.iloc[:,-4:]])
- features_x.append([row1, row[0]])
- features_y.append([row[2], row[1]])
- self.logger.info("匹配环境后,{}组".format(len(features_x)))
- return features_x, features_y
- def get_realtime_data_features(self, norm_data, envir): # 这段代码基于pandas方法的优化
- time_step = self.opt.Model["time_step"]
- feature_data = norm_data.reset_index(drop=True)
- time_step_loc = time_step - 1
- nwp_cs = self.opt.nwp_columns.copy()
- if 'C_TIME' in nwp_cs:
- nwp_cs.pop(nwp_cs.index('C_TIME'))
- nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(1)] # 数据库字段 'C_T': 'C_WS170'
- features_x, features_y = [], []
- self.logger.info("匹配环境前,{}组 -> ".format(len(nwp)))
- for i, row in enumerate(nwp):
- row1 = envir[-self.opt.Model["his_points"]:]
- if len(row1) < self.opt.Model["his_points"]:
- self.logger.info("环境数据不足{}个点:{}".format(self.opt.Model["his_points"], len(row1)))
- continue
- row1 = row1.reset_index(drop=True).drop(['C_TIME'], axis=1)
- features_x.append([row1, row])
- self.logger.info("匹配环境后,{}组".format(len(features_x)))
- return features_x
|