123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :data_handler.py
- # @Time :2025/1/8 14:56
- # @Author :David
- # @Company: shenyang JY
- import argparse, numbers, joblib
- import numpy as np
- import pandas as pd
- from io import BytesIO
- from bson.decimal128 import Decimal128
- from sklearn.preprocessing import MinMaxScaler
- from common.processing_data_common import missing_features, str_to_list
- from common.data_cleaning import *
- class DataHandler(object):
- def __init__(self, logger, args):
- self.logger = logger
- self.opt = argparse.Namespace(**args)
- def get_train_data(self, dfs, col_time, target, time_series=1):
- train_x, valid_x, train_y, valid_y = [], [], [], []
- for i, df in enumerate(dfs, start=1):
- if len(df) < self.opt.Model["time_step"]:
- self.logger.info("特征处理-训练数据-不满足time_step")
- if time_series == 2:
- datax, datay = self.get_timestep_features_lstm2(df, col_time, target, is_train=True)
- elif time_series == 3:
- datax, datay = self.get_timestep_features_bilstm(df, col_time, target, is_train=True)
- else:
- datax, datay = self.get_timestep_features(df, col_time, target, is_train=True)
- if len(datax) < 10:
- self.logger.info("特征处理-训练数据-无法进行最小分割")
- continue
- tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
- train_x.extend(tx)
- valid_x.extend(vx)
- train_y.extend(ty)
- valid_y.extend(vy)
- train_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in train_y]]), axis=0)
- valid_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in valid_y]]), axis=0)
- train_x = np.array([x.values for x in train_x])
- valid_x = np.array([x.values for x in valid_x])
- return train_x, valid_x, train_y, valid_y
- def get_predict_data(self, dfs, time_series=1):
- test_x = []
- for i, df in enumerate(dfs, start=1):
- if len(df) < self.opt.Model["time_step"]:
- self.logger.info("特征处理-预测数据-不满足time_step")
- continue
- datax = self.get_predict_features(df, time_series)
- test_x.append(datax)
- test_x = np.concatenate(test_x, axis=0)
- return test_x
- def get_predict_features(self, norm_data, time_series=1):
- """
- 均分数据,获取预测数据集
- """
- time_step = self.opt.Model["time_step"]
- feature_data = norm_data.reset_index(drop=True)
- time_step *= int(time_series)
- time_step_loc = time_step - 1
- iters = int(len(feature_data)) // self.opt.Model['time_step']
- end = int(len(feature_data)) % self.opt.Model['time_step']
- features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)])
- if end > 0:
- df = feature_data.tail(end)
- df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True)
- features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
- return features_x
- def get_timestep_features(self, norm_data, col_time, target, is_train):
- """
- 步长分割数据,获取时序训练集
- """
- time_step = self.opt.Model["time_step"]
- feature_data = norm_data.reset_index(drop=True)
- time_step_loc = time_step - 1
- train_num = int(len(feature_data))
- label_features = [col_time, target] if is_train is True else [col_time, target]
- nwp_cs = self.opt.features
- nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
- labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
- features_x, features_y = [], []
- for i, row in enumerate(zip(nwp, labels)):
- features_x.append(row[0])
- features_y.append(row[1])
- return features_x, features_y
- def get_timestep_features_lstm2(self, norm_data, col_time, target, is_train):
- """
- 步长分割数据,获取时序训练集
- """
- time_step = self.opt.Model["time_step"]
- feature_data = norm_data.reset_index(drop=True)
- time_step_loc = time_step*2 - 1
- train_num = int(len(feature_data))
- label_features = [col_time, target] if is_train is True else [col_time, target]
- nwp_cs = self.opt.features
- nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*2 + 1)] # 数据库字段 'C_T': 'C_WS170'
- labels = [feature_data.loc[i+time_step:i+time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step*2 + 1)]
- features_x, features_y = [], []
- for i, row in enumerate(zip(nwp, labels)):
- features_x.append(row[0])
- features_y.append(row[1])
- return features_x, features_y
- def get_timestep_features_bilstm(self, norm_data, col_time, target, is_train):
- """
- 步长分割数据,获取时序训练集
- """
- time_step = self.opt.Model["time_step"]
- feature_data = norm_data.reset_index(drop=True)
- time_step_loc = time_step*3 - 1
- time_step_m = time_step*2 - 1
- train_num = int(len(feature_data))
- label_features = [col_time, target] if is_train is True else [col_time, target]
- nwp_cs = self.opt.features
- nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*3 + 1)] # 数据库字段 'C_T': 'C_WS170'
- labels = [feature_data.loc[i+time_step:i+time_step_m, label_features].reset_index(drop=True) for i in range(train_num - time_step*3 + 1)]
- features_x, features_y = [], []
- for i, row in enumerate(zip(nwp, labels)):
- features_x.append(row[0])
- features_y.append(row[1])
- return features_x, features_y
- def fill_train_data(self, unite, col_time):
- """
- 补值
- """
- unite[col_time] = pd.to_datetime(unite[col_time])
- unite['time_diff'] = unite[col_time].diff()
- dt_short = pd.Timedelta(minutes=15)
- dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
- data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
- miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
- miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
- self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
- if miss_number > 0 and self.opt.Model["train_data_fill"]:
- data_train = self.data_fill(data_train, col_time)
- return data_train
- def fill_pre_data(self, unite):
- unite = unite.interpolate(method='linear') # nwp先进行线性填充
- unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充
- return unite
- def missing_time_splite(self, df, dt_short, dt_long, col_time):
- df.reset_index(drop=True, inplace=True)
- n_long, n_short, n_points = 0, 0, 0
- start_index = 0
- dfs = []
- for i in range(1, len(df)):
- if df['time_diff'][i] >= dt_long:
- df_long = df.iloc[start_index:i, :-1]
- dfs.append(df_long)
- start_index = i
- n_long += 1
- if df['time_diff'][i] > dt_short:
- self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
- points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
- self.logger.info("缺失点数:{}".format(points))
- if df['time_diff'][i] < dt_long:
- n_short += 1
- n_points += points
- self.logger.info("需要补值的点数:{}".format(points))
- dfs.append(df.iloc[start_index:, :-1])
- self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
- self.logger.info("需要补值的总点数:{}".format(n_points))
- return dfs
- def data_fill(self, dfs, col_time, test=False):
- dfs_fill, inserts = [], 0
- for i, df in enumerate(dfs):
- df = rm_duplicated(df, self.logger)
- df1 = df.set_index(col_time, inplace=False)
- dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
- dff.reset_index(inplace=True)
- points = len(dff) - len(df1)
- dfs_fill.append(dff)
- self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
- inserts += points
- name = "预测数据" if test is True else "训练集"
- self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
- return dfs_fill
- def train_valid_split(self, datax, datay, valid_rate, shuffle):
- shuffle_index = np.random.permutation(len(datax))
- indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
- valid_size = int(len(datax) * valid_rate)
- valid_index = indexs[-valid_size:]
- train_index = indexs[:-valid_size]
- tx, vx, ty, vy = [], [], [], []
- for i, data in enumerate(zip(datax, datay)):
- if i in train_index:
- tx.append(data[0])
- ty.append(data[1])
- elif i in valid_index:
- vx.append(data[0])
- vy.append(data[1])
- return tx, vx, ty, vy
- def train_data_handler(self, data, bp_data=False, time_series=1):
- """
- 训练数据预处理:
- 清洗+补值+归一化
- Args:
- data: 从mongo中加载的数据
- opt:参数命名空间
- return:
- x_train
- x_valid
- y_train
- y_valid
- """
- col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
- # 清洗限电记录
- if 'is_limit' in data.columns:
- data = data[data['is_limit'] == False]
- # 筛选特征,数值化,排序
- train_data = data[[col_time] + features + [target]]
- train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
- train_data = train_data.sort_values(by=col_time)
- # 清洗特征平均缺失率大于20%的天
- # train_data = missing_features(train_data, features, col_time)
- # 对清洗完限电的数据进行特征预处理:
- # 1.空值异常值清洗
- train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
- self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
- # 2. 标准化
- # 创建特征和目标的标准化器
- train_scaler = MinMaxScaler(feature_range=(0, 1))
- target_scaler = MinMaxScaler(feature_range=(0, 1))
- # 标准化特征和目标
- scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features])
- scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
- scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0]
- train_data_cleaned[self.opt.features] = scaled_train_data
- train_data_cleaned[[target]] = scaled_target
- # 3.缺值补值
- train_datas = self.fill_train_data(train_data_cleaned, col_time)
- # 保存两个scaler
- scaled_train_bytes = BytesIO()
- scaled_target_bytes = BytesIO()
- joblib.dump(train_scaler, scaled_train_bytes)
- joblib.dump(target_scaler, scaled_target_bytes)
- scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
- scaled_target_bytes.seek(0)
- if bp_data:
- train_data = pd.concat(train_datas, axis=0)
- train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[self.opt.features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
- train_x, valid_x, train_y, valid_y = np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
- else:
- train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target, time_series)
- return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
- def pre_data_handler(self, data, feature_scaler, bp_data=False, time_series=1):
- """
- 预测数据简单处理
- Args:
- data: 从mongo中加载的数据
- opt:参数命名空间
- return:
- scaled_features: 反归一化的特征
- """
- # 清洗限电记录
- if 'is_limit' in data.columns:
- data = data[data['is_limit'] == False]
- # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
- # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
- col_time, features = self.opt.col_time, self.opt.features
- data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
- data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
- if not set(features).issubset(set(data.columns.tolist())):
- raise ValueError("预测数据特征不满足模型特征!")
- pre_data = data[features].copy()
- if self.opt.Model['predict_data_fill']:
- pre_data = self.fill_pre_data(pre_data)
- pre_data.loc[:, features] = feature_scaler.transform(pre_data)
- if bp_data:
- pre_x = np.array(pre_data)
- else:
- pre_x = self.get_predict_data([pre_data], time_series)
- return pre_x, data
|