#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :data_handler.py # @Time :2025/1/8 14:56 # @Author :David # @Company: shenyang JY import argparse, numbers, joblib import numpy as np import pandas as pd from io import BytesIO from bson.decimal128 import Decimal128 from sklearn.preprocessing import MinMaxScaler from common.processing_data_common import missing_features, str_to_list from common.data_cleaning import * class DataHandler(object): def __init__(self, logger, args): self.logger = logger self.opt = argparse.Namespace(**args) def get_train_data(self, dfs, col_time, target, time_series=1): train_x, valid_x, train_y, valid_y = [], [], [], [] for i, df in enumerate(dfs, start=1): if len(df) < self.opt.Model["time_step"]: self.logger.info("特征处理-训练数据-不满足time_step") if time_series == 2: datax, datay = self.get_timestep_features_lstm2(df, col_time, target, is_train=True) elif time_series == 3: datax, datay = self.get_timestep_features_bilstm(df, col_time, target, is_train=True) else: datax, datay = self.get_timestep_features(df, col_time, target, is_train=True) if len(datax) < 10: self.logger.info("特征处理-训练数据-无法进行最小分割") continue tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data']) train_x.extend(tx) valid_x.extend(vx) train_y.extend(ty) valid_y.extend(vy) train_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in train_y]]), axis=0) valid_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in valid_y]]), axis=0) train_x = np.array([x.values for x in train_x]) valid_x = np.array([x.values for x in valid_x]) return train_x, valid_x, train_y, valid_y def get_predict_data(self, dfs, time_series=1): test_x = [] for i, df in enumerate(dfs, start=1): if len(df) < self.opt.Model["time_step"]: self.logger.info("特征处理-预测数据-不满足time_step") continue datax = self.get_predict_features(df, time_series) test_x.append(datax) test_x = np.concatenate(test_x, axis=0) return test_x def get_predict_features(self, norm_data, time_series=1): """ 均分数据,获取预测数据集 """ time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step *= int(time_series) time_step_loc = time_step - 1 iters = int(len(feature_data)) // time_step end = int(len(feature_data)) % time_step features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)]) if end > 0: df = feature_data.tail(end) df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True) features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0) return features_x def get_timestep_features(self, norm_data, col_time, target, is_train): """ 步长分割数据,获取时序训练集 """ time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step - 1 train_num = int(len(feature_data)) label_features = [col_time, target] if is_train is True else [col_time, target] nwp_cs = self.opt.features nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170' labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)] features_x, features_y = [], [] for i, row in enumerate(zip(nwp, labels)): features_x.append(row[0]) features_y.append(row[1]) return features_x, features_y def get_timestep_features_lstm2(self, norm_data, col_time, target, is_train): """ 步长分割数据,获取时序训练集 """ time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step*2 - 1 train_num = int(len(feature_data)) label_features = [col_time, target] if is_train is True else [col_time, target] nwp_cs = self.opt.features nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*2 + 1)] # 数据库字段 'C_T': 'C_WS170' labels = [feature_data.loc[i+time_step:i+time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step*2 + 1)] features_x, features_y = [], [] for i, row in enumerate(zip(nwp, labels)): features_x.append(row[0]) features_y.append(row[1]) return features_x, features_y def get_timestep_features_bilstm(self, norm_data, col_time, target, is_train): """ 步长分割数据,获取时序训练集 """ time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step*3 - 1 time_step_m = time_step*2 - 1 train_num = int(len(feature_data)) label_features = [col_time, target] if is_train is True else [col_time, target] nwp_cs = self.opt.features nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*3 + 1)] # 数据库字段 'C_T': 'C_WS170' labels = [feature_data.loc[i+time_step:i+time_step_m, label_features].reset_index(drop=True) for i in range(train_num - time_step*3 + 1)] features_x, features_y = [], [] for i, row in enumerate(zip(nwp, labels)): features_x.append(row[0]) features_y.append(row[1]) return features_x, features_y def fill_train_data(self, unite, col_time): """ 补值 """ unite[col_time] = pd.to_datetime(unite[col_time]) unite['time_diff'] = unite[col_time].diff() dt_short = pd.Timedelta(minutes=15) dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill']) data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time) miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)] miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points) self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number)) if miss_number > 0 and self.opt.Model["train_data_fill"]: data_train = self.data_fill(data_train, col_time) return data_train def fill_pre_data(self, unite): unite = unite.interpolate(method='linear') # nwp先进行线性填充 unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充 return unite def missing_time_splite(self, df, dt_short, dt_long, col_time): df.reset_index(drop=True, inplace=True) n_long, n_short, n_points = 0, 0, 0 start_index = 0 dfs = [] for i in range(1, len(df)): if df['time_diff'][i] >= dt_long: df_long = df.iloc[start_index:i, :-1] dfs.append(df_long) start_index = i n_long += 1 if df['time_diff'][i] > dt_short: self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}") points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1 self.logger.info("缺失点数:{}".format(points)) if df['time_diff'][i] < dt_long: n_short += 1 n_points += points self.logger.info("需要补值的点数:{}".format(points)) dfs.append(df.iloc[start_index:, :-1]) self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}") self.logger.info("需要补值的总点数:{}".format(n_points)) return dfs def data_fill(self, dfs, col_time, test=False): dfs_fill, inserts = [], 0 for i, df in enumerate(dfs): df = rm_duplicated(df, self.logger) df1 = df.set_index(col_time, inplace=False) dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比 dff.reset_index(inplace=True) points = len(dff) - len(df1) dfs_fill.append(dff) self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points)) inserts += points name = "预测数据" if test is True else "训练集" self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts)) return dfs_fill def train_valid_split(self, datax, datay, valid_rate, shuffle): shuffle_index = np.random.permutation(len(datax)) indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist() valid_size = int(len(datax) * valid_rate) valid_index = indexs[-valid_size:] train_index = indexs[:-valid_size] tx, vx, ty, vy = [], [], [], [] for i, data in enumerate(zip(datax, datay)): if i in train_index: tx.append(data[0]) ty.append(data[1]) elif i in valid_index: vx.append(data[0]) vy.append(data[1]) return tx, vx, ty, vy def train_data_handler(self, data, bp_data=False, time_series=1): """ 训练数据预处理: 清洗+补值+归一化 Args: data: 从mongo中加载的数据 opt:参数命名空间 return: x_train x_valid y_train y_valid """ col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target # 清洗限电记录 if 'is_limit' in data.columns: data = data[data['is_limit'] == False] # 筛选特征,数值化,排序 train_data = data[[col_time] + features + [target]] train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x) train_data = train_data.sort_values(by=col_time) # 清洗特征平均缺失率大于20%的天 # train_data = missing_features(train_data, features, col_time) # 对清洗完限电的数据进行特征预处理: # 1.空值异常值清洗 train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time) self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features] # 2. 标准化 # 创建特征和目标的标准化器 train_scaler = MinMaxScaler(feature_range=(0, 1)) target_scaler = MinMaxScaler(feature_range=(0, 1)) # 标准化特征和目标 scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features]) scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]]) scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0] train_data_cleaned[self.opt.features] = scaled_train_data train_data_cleaned[[target]] = scaled_target # 3.缺值补值 train_datas = self.fill_train_data(train_data_cleaned, col_time) # 保存两个scaler scaled_train_bytes = BytesIO() scaled_target_bytes = BytesIO() joblib.dump(train_scaler, scaled_train_bytes) joblib.dump(target_scaler, scaled_target_bytes) scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream scaled_target_bytes.seek(0) if bp_data: train_data = pd.concat(train_datas, axis=0) train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[self.opt.features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data']) train_x, valid_x, train_y, valid_y = np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y) else: train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target, time_series) return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap def pre_data_handler(self, data, feature_scaler, bp_data=False, time_series=1): """ 预测数据简单处理 Args: data: 从mongo中加载的数据 opt:参数命名空间 return: scaled_features: 反归一化的特征 """ # 清洗限电记录 if 'is_limit' in data.columns: data = data[data['is_limit'] == False] # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int( # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve']) col_time, features = self.opt.col_time, self.opt.features data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x) data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False) if not set(features).issubset(set(data.columns.tolist())): raise ValueError("预测数据特征不满足模型特征!") pre_data = data[features].copy() if self.opt.Model['predict_data_fill']: pre_data = self.fill_pre_data(pre_data) pre_data.loc[:, features] = feature_scaler.transform(pre_data) if bp_data: pre_x = np.array(pre_data) else: pre_x = self.get_predict_data([pre_data], time_series) return pre_x, data