#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :data_handler.py # @Time :2025/1/8 14:56 # @Author :David # @Company: shenyang JY import argparse import pandas as pd from common.data_cleaning import * class DataHandler(object): def __init__(self, logger, args): self.logger = logger self.opt = argparse.Namespace(**args) def get_train_data(self, df): train_x, valid_x, train_y, valid_y = [], [], [], [] if len(df) < self.opt.Model["time_step"]: self.logger.info("特征处理-训练数据-不满足time_step") datax, datay = self.get_timestep_features(df, is_train=True) if len(datax) < 10: self.logger.info("特征处理-训练数据-无法进行最小分割") tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data']) train_x.extend(tx) valid_x.extend(vx) train_y.extend(ty) valid_y.extend(vy) train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0) valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0) train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])] valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])] return train_x, valid_x, train_y, valid_y def get_timestep_features(self, norm_data, is_train): # 这段代码基于pandas方法的优化 time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step - 1 train_num = int(len(feature_data)) label_features = ['C_TIME', 'C_REAL_VALUE'] if is_train is True else ['C_TIME', 'C_REAL_VALUE'] nwp_cs = self.opt.features nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170' labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)] features_x, features_y = [], [] self.logger.info("匹配环境前,{}组 -> ".format(len(nwp))) for i, row in enumerate(zip(nwp, labels)): features_x.append(row[0]) features_y.append(row[1]) self.logger.info("匹配环境后,{}组".format(len(features_x))) return features_x, features_y def fill_train_data(self, unite): unite['C_TIME'] = pd.to_datetime(unite['C_TIME']) unite['time_diff'] = unite['C_TIME'].diff() dt_short = pd.Timedelta(minutes=15) dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill']) data_train = self.missing_time_splite(unite, dt_short, dt_long) miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)] miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points) self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number)) if miss_number > 0 and self.opt.Model["train_data_fill"]: data_train = self.data_fill(data_train) return data_train def missing_time_splite(self, df, dt_short, dt_long): n_long, n_short, n_points = 0, 0, 0 start_index = 0 dfs = [] for i in range(1, len(df)): if df['time_diff'][i] >= dt_long: df_long = df.iloc[start_index:i, :-1] dfs.append(df_long) start_index = i n_long += 1 if df['time_diff'][i] > dt_short: self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}") points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1 self.logger.info("缺失点数:{}".format(points)) if df['time_diff'][i] < dt_long: n_short += 1 n_points += points self.logger.info("需要补值的点数:{}".format(points)) dfs.append(df.iloc[start_index:, :-1]) self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}") self.logger.info("需要补值的总点数:{}".format(n_points)) return dfs def data_fill(self, dfs, test=False): dfs_fill, inserts = [], 0 for i, df in enumerate(dfs): df = rm_duplicated(df, self.logger) df1 = df.set_index('C_TIME', inplace=False) dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比 dff.reset_index(inplace=True) points = len(dff) - len(df1) dfs_fill.append(dff) self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points)) inserts += points name = "预测数据" if test is True else "训练集" self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts)) return dfs_fill def train_valid_split(self, datax, datay, valid_rate, shuffle): shuffle_index = np.random.permutation(len(datax)) indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist() valid_size = int(len(datax) * valid_rate) valid_index = indexs[-valid_size:] train_index = indexs[:-valid_size] tx, vx, ty, vy = [], [], [], [] for i, data in enumerate(zip(datax, datay)): if i in train_index: tx.append(data[0]) ty.append(data[1]) elif i in valid_index: vx.append(data[0]) vy.append(data[1]) return tx, vx, ty, vy