#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/4/12 17:42 # file: data_features.py # author: David # company: shenyang JY import pandas as pd import numpy as np np.random.seed(42) class DataFeatures(object): def __init__(self, log, args): self.logger = log self.args = args self.opt = self.args.parse_args_and_yaml() self.columns = list() def train_valid_split(self, datax, datay, valid_rate, shuffle): shuffle_index = np.random.permutation(len(datax)) indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist() valid_size = int(len(datax)*valid_rate) valid_index = indexs[-valid_size:] train_index = indexs[:-valid_size] tx, vx, ty, vy = [], [], [], [] for i, data in enumerate(zip(datax, datay)): if i in train_index: tx.append(data[0]) ty.append(data[1]) elif i in valid_index: vx.append(data[0]) vy.append(data[1]) return tx, vx, ty, vy def get_train_data(self, dfs, envir): num = 1 train_x, valid_x, train_y, valid_y = [], [], [], [] for i, df in enumerate(dfs, start=1): if len(df) < self.opt.Model["time_step"]: self.logger.info("特征处理-训练数据-不满足time_step +{}".format(num)) num += 1 continue datax, datay = self.get_data_features(df, envir, is_train=True) if len(datax) < 10: self.logger.info("特征处理-训练数据-无法进行最小分割 +{}".format(num)) num += 1 continue tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data']) train_x.extend(tx) valid_x.extend(vx) train_y.extend(ty) valid_y.extend(vy) train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0) valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0) train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])] valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])] return train_x, valid_x, train_y, valid_y def get_test_data(self, dfs, envir): num = 0 test_x, test_y, data_y = [], [], [] for i, df in enumerate(dfs, start=1): if len(df) < self.opt.Model["time_step"]: self.logger.info("特征处理-测试数据-不满足time_step +{}".format(num)) num += 1 continue datax, datay = self.get_data_features(df, envir, is_train=False) test_x.extend(datax) test_y.extend(datay) data_y.extend(datay) test_x = [np.array([x[0].values for x in test_x]), np.array([x[1].values for x in test_x])] test_y = np.concatenate([[y.iloc[:, 1].values for y in test_y]], axis=0) return test_x, test_y, data_y def get_realtime_data(self, dfs, envir): test_x = [] for i, df in enumerate(dfs, start=1): if len(df) < self.opt.Model["time_step"]: self.logger.info("特征处理-预测数据-不满足time_step") continue datax = self.get_realtime_data_features(df, envir) test_x.extend(datax) test_x = [np.array([x[0].values for x in test_x]), np.array([x[1].values for x in test_x])] return test_x def get_data_features(self, norm_data, envir, is_train): # 这段代码基于pandas方法的优化 time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step - 1 train_num = int(len(feature_data)) label_features = ['C_TIME', 'C_REAL_VALUE'] if is_train is True else ['C_TIME', 'C_REAL_VALUE'] nwp_cs = self.opt.nwp_columns.copy() if 'C_TIME' in nwp_cs: nwp_cs.pop(nwp_cs.index('C_TIME')) nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170' labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)] features_x, features_y = [], [] env_fill = envir[-self.opt.Model["his_points"]:] self.logger.info("匹配环境前,{}组 -> ".format(len(nwp))) for i, row in enumerate(zip(nwp, labels)): time_end = row[1]['C_TIME'][0] time_start = time_end - pd.DateOffset(1) row1 = envir[(envir.C_TIME < time_end) & (envir.C_TIME > time_start)][-self.opt.Model["his_points"]:] if len(row1) < self.opt.Model["his_points"]: if self.opt.Model['fusion']: row1 = env_fill self.logger.info("训练环境数据不足{}个点:{},用数据进行填充".format(self.opt.Model["his_points"], len(row1))) else: self.logger.info("训练环境数据不足{}个点:{},弃用".format(self.opt.Model["his_points"], len(row1))) continue row1 = row1.reset_index(drop=True).drop(['C_TIME'], axis=1) features_x.append([row1, row[0]]) features_y.append(row[1]) self.logger.info("匹配环境后,{}组".format(len(features_x))) return features_x, features_y def get_realtime_data_features(self, norm_data, envir): # 这段代码基于pandas方法的优化 time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step - 1 nwp_cs = self.opt.nwp_columns.copy() if 'C_TIME' in nwp_cs: nwp_cs.pop(nwp_cs.index('C_TIME')) nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(1)] # 数据库字段 'C_T': 'C_WS170' features_x, features_y = [], [] self.logger.info("匹配环境前,{}组 -> ".format(len(nwp))) for i, row in enumerate(nwp): row1 = envir[-self.opt.Model["his_points"]:] if len(row1) < self.opt.Model["his_points"]: self.logger.info("环境数据不足{}个点:{}".format(self.opt.Model["his_points"], len(row1))) continue row1 = row1.reset_index(drop=True).drop(['C_TIME'], axis=1) features_x.append([row1, row]) self.logger.info("匹配环境后,{}组".format(len(features_x))) return features_x