#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :custom_data_handler.py # @Time :2025/05/10 16:06 # @Author :David # @Company: shenyang JY import argparse, numbers, joblib from unittest.mock import inplace import numpy as np import pandas as pd from io import BytesIO from bson.decimal128 import Decimal128 from sklearn.preprocessing import MinMaxScaler from sqlalchemy.dialects.mssql.information_schema import columns from common.processing_data_common import missing_features, str_to_list from common.data_cleaning import * class CustomDataHandler(object): def __init__(self, logger, args): self.logger = logger self.opt = argparse.Namespace(**args) def get_train_data(self, dfs, col_time, target, time_series=1): train_x, valid_x, train_y, valid_y = [], [], [], [] for i, df in enumerate(dfs, start=1): if len(df) < self.opt.Model["time_step"]: self.logger.info("特征处理-训练数据-不满足time_step") datax, datay = self.get_timestep_features_zone(df, col_time, target, is_train=True, time_series=time_series) if len(datax) < 10: self.logger.info("特征处理-训练数据-无法进行最小分割") continue tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data']) train_x.extend(tx) valid_x.extend(vx) train_y.extend(ty) valid_y.extend(vy) train_y = [np.array([y[0].values for y in train_y]), np.array([y[1].iloc[:, 1].values for y in train_y])] valid_y = [np.array([y[0].values for y in valid_y]), np.array([y[1].iloc[:, 1].values for y in valid_y])] train_x = np.array([x.values for x in train_x]) valid_x = np.array([x.values for x in valid_x]) return train_x, valid_x, train_y, valid_y def get_predict_data(self, dfs, time_series=1): test_x = [] for i, df in enumerate(dfs, start=1): if len(df) < self.opt.Model["time_step"]*time_series: self.logger.info("特征处理-预测数据-不满足time_step") continue datax = self.get_predict_features(df, time_series) test_x.append(datax) test_x = np.concatenate(test_x, axis=0) return test_x def get_predict_features(self, norm_data, time_series=1): """ 均分数据,获取预测数据集 """ time_step = self.opt.Model["time_step"] feature_data = norm_data.loc[:, self.opt.features].reset_index(drop=True) time_step *= int(time_series) time_step_loc = time_step - 1 iters = int(len(feature_data)) // time_step end = int(len(feature_data)) % time_step features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)]) if end > 0: df = feature_data.tail(end) df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[-1]]* (time_step-end))]).reset_index(drop=True) features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0) return features_x def get_timestep_features(self, norm_data, col_time, target, is_train, time_series=1): """ 步长分割数据,获取时序训练集 """ time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step*time_series - 1 train_num = int(len(feature_data)) label_features = [col_time, target] if is_train is True else [col_time, target] nwp_cs = self.opt.features nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] # 数据库字段 'C_T': 'C_WS170' labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] features_x, features_y = [], [] for i, row in enumerate(zip(nwp, labels)): features_x.append(row[0]) features_y.append(row[1]) return features_x, features_y def get_timestep_features_zone(self, norm_data, col_time, target, is_train, time_series): """ 步长分割数据,分区建模 """ time_step = self.opt.Model["time_step"] feature_data = norm_data.reset_index(drop=True) time_step_loc = time_step*time_series - 1 train_num = int(len(feature_data)) label_features_power = [col_time, target] if is_train is True else [col_time, target] label_features_zone = self.opt.zone nwp_cs = [x for x in self.opt.features if x not in self.opt.zone] nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] labels_power = [feature_data.loc[i:i + time_step_loc, label_features_power].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] labels_zone = [feature_data.loc[i:i + time_step_loc, label_features_zone].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] features_x, features_y = [], [] for i, row in enumerate(zip(nwp, labels_power, labels_zone)): features_x.append(row[0]) features_y.append([row[2], row[1]]) return features_x, features_y def fill_train_data(self, unite, col_time): """ 补值 """ unite[col_time] = pd.to_datetime(unite[col_time]) unite['time_diff'] = unite[col_time].diff() dt_short = pd.Timedelta(minutes=15) dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill']) data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time) miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)] miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points) self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number)) if miss_number > 0 and self.opt.Model["train_data_fill"]: data_train = self.data_fill(data_train, col_time) return data_train def fill_pre_data(self, unite): unite = unite.interpolate(method='linear') # nwp先进行线性填充 unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充 return unite def missing_time_splite(self, df, dt_short, dt_long, col_time): df.reset_index(drop=True, inplace=True) n_long, n_short, n_points = 0, 0, 0 start_index = 0 dfs = [] for i in range(1, len(df)): if df['time_diff'][i] >= dt_long: df_long = df.iloc[start_index:i, :-1] dfs.append(df_long) start_index = i n_long += 1 if df['time_diff'][i] > dt_short: self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}") points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1 self.logger.info("缺失点数:{}".format(points)) if df['time_diff'][i] < dt_long: n_short += 1 n_points += points self.logger.info("需要补值的点数:{}".format(points)) dfs.append(df.iloc[start_index:, :-1]) self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}") self.logger.info("需要补值的总点数:{}".format(n_points)) return dfs def data_fill(self, dfs, col_time, test=False): dfs_fill, inserts = [], 0 for i, df in enumerate(dfs): df = rm_duplicated(df, self.logger) df1 = df.set_index(col_time, inplace=False) dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比 dff.reset_index(inplace=True) points = len(dff) - len(df1) dfs_fill.append(dff) self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points)) inserts += points name = "预测数据" if test is True else "训练集" self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts)) return dfs_fill def train_valid_split(self, datax, datay, valid_rate, shuffle): shuffle_index = np.random.permutation(len(datax)) indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist() valid_size = int(len(datax) * valid_rate) valid_index = indexs[-valid_size:] train_index = indexs[:-valid_size] tx, vx, ty, vy = [], [], [], [] for i, data in enumerate(zip(datax, datay)): if i in train_index: tx.append(data[0]) ty.append(data[1]) elif i in valid_index: vx.append(data[0]) vy.append(data[1]) return tx, vx, ty, vy def train_data_handler(self, data, time_series=1): """ 训练数据预处理: 清洗+补值+归一化 Args: data: 从mongo中加载的数据 opt:参数命名空间 return: x_train x_valid y_train y_valid """ col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target # 清洗限电记录 if 'is_limit' in data.columns: data = data[data['is_limit'] == False] # 筛选特征,数值化,排序 train_data = data[[col_time] + features + [target] + self.opt.zone] train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x) train_data = train_data.sort_values(by=col_time) # 清洗特征平均缺失率大于20%的天 # train_data = missing_features(train_data, features, col_time) # 对清洗完限电的数据进行特征预处理: # 1.空值异常值清洗 train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time) self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features] # 2. 标准化 # 创建特征和目标的标准化器 train_scaler = MinMaxScaler(feature_range=(0, 1)) target_scaler = MinMaxScaler(feature_range=(0, 1)) # 标准化特征和目标 scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features+self.opt.zone]) scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]]) scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0] train_data_cleaned[self.opt.features+self.opt.zone] = scaled_train_data train_data_cleaned[[target]] = scaled_target # 3.缺值补值 train_datas = self.fill_train_data(train_data_cleaned, col_time) # 保存两个scaler scaled_train_bytes = BytesIO() scaled_target_bytes = BytesIO() joblib.dump(train_scaler, scaled_train_bytes) joblib.dump(target_scaler, scaled_target_bytes) scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream scaled_target_bytes.seek(0) train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target, time_series) return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap def pre_data_handler(self, data, feature_scaler, time_series=1): """ 预测数据简单处理 Args: data: 从mongo中加载的数据 opt:参数命名空间 return: scaled_features: 反归一化的特征 """ # 清洗限电记录 if 'is_limit' in data.columns: data = data[data['is_limit'] == False] # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int( # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve']) col_time, features = self.opt.col_time, self.opt.features data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x) data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False) if not set(features).issubset(set(data.columns.tolist())): raise ValueError("预测数据特征不满足模型特征!") pre_data = data[features].copy() pre_data[self.opt.zone] = 1 if self.opt.Model['predict_data_fill']: pre_data = self.fill_pre_data(pre_data) scaled_pre_data = feature_scaler.transform(pre_data)[:, :len(features)] pre_data.drop(columns=self.opt.zone, inplace=True) pre_data.loc[:, features] = scaled_pre_data pre_x = self.get_predict_data([pre_data], time_series) return pre_x, data