|
@@ -4,8 +4,12 @@
|
|
# @Time :2025/1/8 14:56
|
|
# @Time :2025/1/8 14:56
|
|
# @Author :David
|
|
# @Author :David
|
|
# @Company: shenyang JY
|
|
# @Company: shenyang JY
|
|
-import argparse
|
|
|
|
|
|
+import argparse, numbers, joblib
|
|
import pandas as pd
|
|
import pandas as pd
|
|
|
|
+from io import BytesIO
|
|
|
|
+from bson.decimal128 import Decimal128
|
|
|
|
+from sklearn.preprocessing import MinMaxScaler
|
|
|
|
+from common.processing_data_common import missing_features, str_to_list
|
|
from common.data_cleaning import *
|
|
from common.data_cleaning import *
|
|
|
|
|
|
class DataHandler(object):
|
|
class DataHandler(object):
|
|
@@ -13,58 +17,92 @@ class DataHandler(object):
|
|
self.logger = logger
|
|
self.logger = logger
|
|
self.opt = argparse.Namespace(**args)
|
|
self.opt = argparse.Namespace(**args)
|
|
|
|
|
|
- def get_train_data(self, df):
|
|
|
|
|
|
+ def get_train_data(self, dfs, col_time, features, target):
|
|
train_x, valid_x, train_y, valid_y = [], [], [], []
|
|
train_x, valid_x, train_y, valid_y = [], [], [], []
|
|
- if len(df) < self.opt.Model["time_step"]:
|
|
|
|
- self.logger.info("特征处理-训练数据-不满足time_step")
|
|
|
|
- datax, datay = self.get_timestep_features(df, is_train=True)
|
|
|
|
- if len(datax) < 10:
|
|
|
|
- self.logger.info("特征处理-训练数据-无法进行最小分割")
|
|
|
|
- tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
|
|
|
|
- train_x.extend(tx)
|
|
|
|
- valid_x.extend(vx)
|
|
|
|
- train_y.extend(ty)
|
|
|
|
- valid_y.extend(vy)
|
|
|
|
|
|
+ for i, df in enumerate(dfs, start=1):
|
|
|
|
+ if len(df) < self.opt.Model["time_step"]:
|
|
|
|
+ self.logger.info("特征处理-训练数据-不满足time_step")
|
|
|
|
+ datax, datay = self.get_timestep_features(df, col_time, features, target, is_train=True)
|
|
|
|
+ if len(datax) < 10:
|
|
|
|
+ self.logger.info("特征处理-训练数据-无法进行最小分割")
|
|
|
|
+ continue
|
|
|
|
+ tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
|
|
|
|
+ train_x.extend(tx)
|
|
|
|
+ valid_x.extend(vx)
|
|
|
|
+ train_y.extend(ty)
|
|
|
|
+ valid_y.extend(vy)
|
|
|
|
|
|
train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0)
|
|
train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0)
|
|
valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0)
|
|
valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0)
|
|
|
|
|
|
- train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
|
|
|
|
- valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
|
|
|
|
|
|
+ train_x = np.array([x.values for x in train_x])
|
|
|
|
+ valid_x = np.array([x.values for x in valid_x])
|
|
|
|
|
|
return train_x, valid_x, train_y, valid_y
|
|
return train_x, valid_x, train_y, valid_y
|
|
|
|
|
|
- def get_timestep_features(self, norm_data, is_train): # 这段代码基于pandas方法的优化
|
|
|
|
|
|
+ def get_predict_data(self, dfs, features):
|
|
|
|
+ test_x = []
|
|
|
|
+ for i, df in enumerate(dfs, start=1):
|
|
|
|
+ if len(df) < self.opt.Model["time_step"]:
|
|
|
|
+ self.logger.info("特征处理-预测数据-不满足time_step")
|
|
|
|
+ continue
|
|
|
|
+ datax = self.get_predict_features(df, features)
|
|
|
|
+ test_x.extend(datax)
|
|
|
|
+ test_x = np.array(test_x)
|
|
|
|
+ return test_x
|
|
|
|
+
|
|
|
|
+ def get_predict_features(self, norm_data, features):
|
|
|
|
+ """
|
|
|
|
+ 均分数据,获取预测数据集
|
|
|
|
+ """
|
|
|
|
+ time_step = self.opt.Model["time_step"]
|
|
|
|
+ feature_data = norm_data.reset_index(drop=True)
|
|
|
|
+ time_step_loc = time_step - 1
|
|
|
|
+ iters = int(len(feature_data)) // self.opt.Model['time_step']
|
|
|
|
+ end = int(len(feature_data)) % self.opt.Model['time_step']
|
|
|
|
+ features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, features].reset_index(drop=True) for i in range(iters)])
|
|
|
|
+ if end > 0:
|
|
|
|
+ df = feature_data.tail(end)
|
|
|
|
+ df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True)
|
|
|
|
+ features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
|
|
|
|
+ return features_x
|
|
|
|
+
|
|
|
|
+ def get_timestep_features(self, norm_data, col_time, features, target, is_train):
|
|
|
|
+ """
|
|
|
|
+ 步长分割数据,获取时序训练集
|
|
|
|
+ """
|
|
time_step = self.opt.Model["time_step"]
|
|
time_step = self.opt.Model["time_step"]
|
|
feature_data = norm_data.reset_index(drop=True)
|
|
feature_data = norm_data.reset_index(drop=True)
|
|
time_step_loc = time_step - 1
|
|
time_step_loc = time_step - 1
|
|
train_num = int(len(feature_data))
|
|
train_num = int(len(feature_data))
|
|
- label_features = ['C_TIME', 'C_REAL_VALUE'] if is_train is True else ['C_TIME', 'C_REAL_VALUE']
|
|
|
|
- nwp_cs = self.opt.features
|
|
|
|
|
|
+ label_features = [col_time, target] if is_train is True else [col_time, target]
|
|
|
|
+ nwp_cs = features
|
|
nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
|
|
nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
|
|
labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
|
|
labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
|
|
features_x, features_y = [], []
|
|
features_x, features_y = [], []
|
|
- self.logger.info("匹配环境前,{}组 -> ".format(len(nwp)))
|
|
|
|
for i, row in enumerate(zip(nwp, labels)):
|
|
for i, row in enumerate(zip(nwp, labels)):
|
|
features_x.append(row[0])
|
|
features_x.append(row[0])
|
|
features_y.append(row[1])
|
|
features_y.append(row[1])
|
|
- self.logger.info("匹配环境后,{}组".format(len(features_x)))
|
|
|
|
return features_x, features_y
|
|
return features_x, features_y
|
|
|
|
|
|
- def fill_train_data(self, unite):
|
|
|
|
- unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
|
|
|
|
- unite['time_diff'] = unite['C_TIME'].diff()
|
|
|
|
|
|
+ def fill_train_data(self, unite, col_time):
|
|
|
|
+ """
|
|
|
|
+ 补值
|
|
|
|
+ """
|
|
|
|
+ unite[col_time] = pd.to_datetime(unite[col_time])
|
|
|
|
+ unite['time_diff'] = unite[col_time].diff()
|
|
dt_short = pd.Timedelta(minutes=15)
|
|
dt_short = pd.Timedelta(minutes=15)
|
|
dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
|
|
dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
|
|
- data_train = self.missing_time_splite(unite, dt_short, dt_long)
|
|
|
|
|
|
+ data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
|
|
miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
|
|
miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
|
|
miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
|
|
miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
|
|
self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
|
|
self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
|
|
if miss_number > 0 and self.opt.Model["train_data_fill"]:
|
|
if miss_number > 0 and self.opt.Model["train_data_fill"]:
|
|
- data_train = self.data_fill(data_train)
|
|
|
|
|
|
+ data_train = self.data_fill(data_train, col_time)
|
|
return data_train
|
|
return data_train
|
|
|
|
|
|
- def missing_time_splite(self, df, dt_short, dt_long):
|
|
|
|
|
|
+ def missing_time_splite(self, df, dt_short, dt_long, col_time):
|
|
|
|
+ df.reset_index(drop=True, inplace=True)
|
|
n_long, n_short, n_points = 0, 0, 0
|
|
n_long, n_short, n_points = 0, 0, 0
|
|
start_index = 0
|
|
start_index = 0
|
|
dfs = []
|
|
dfs = []
|
|
@@ -75,7 +113,7 @@ class DataHandler(object):
|
|
start_index = i
|
|
start_index = i
|
|
n_long += 1
|
|
n_long += 1
|
|
if df['time_diff'][i] > dt_short:
|
|
if df['time_diff'][i] > dt_short:
|
|
- self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}")
|
|
|
|
|
|
+ self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
|
|
points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
|
|
points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
|
|
self.logger.info("缺失点数:{}".format(points))
|
|
self.logger.info("缺失点数:{}".format(points))
|
|
if df['time_diff'][i] < dt_long:
|
|
if df['time_diff'][i] < dt_long:
|
|
@@ -87,11 +125,11 @@ class DataHandler(object):
|
|
self.logger.info("需要补值的总点数:{}".format(n_points))
|
|
self.logger.info("需要补值的总点数:{}".format(n_points))
|
|
return dfs
|
|
return dfs
|
|
|
|
|
|
- def data_fill(self, dfs, test=False):
|
|
|
|
|
|
+ def data_fill(self, dfs, col_time, test=False):
|
|
dfs_fill, inserts = [], 0
|
|
dfs_fill, inserts = [], 0
|
|
for i, df in enumerate(dfs):
|
|
for i, df in enumerate(dfs):
|
|
df = rm_duplicated(df, self.logger)
|
|
df = rm_duplicated(df, self.logger)
|
|
- df1 = df.set_index('C_TIME', inplace=False)
|
|
|
|
|
|
+ df1 = df.set_index(col_time, inplace=False)
|
|
dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
|
|
dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
|
|
dff.reset_index(inplace=True)
|
|
dff.reset_index(inplace=True)
|
|
points = len(dff) - len(df1)
|
|
points = len(dff) - len(df1)
|
|
@@ -117,3 +155,79 @@ class DataHandler(object):
|
|
vx.append(data[0])
|
|
vx.append(data[0])
|
|
vy.append(data[1])
|
|
vy.append(data[1])
|
|
return tx, vx, ty, vy
|
|
return tx, vx, ty, vy
|
|
|
|
+
|
|
|
|
+ def train_data_handler(self, data, opt, bp_data=False):
|
|
|
|
+ """
|
|
|
|
+ 训练数据预处理:
|
|
|
|
+ 清洗+补值+归一化
|
|
|
|
+ Args:
|
|
|
|
+ data: 从mongo中加载的数据
|
|
|
|
+ opt:参数命名空间
|
|
|
|
+ return:
|
|
|
|
+ x_train
|
|
|
|
+ x_valid
|
|
|
|
+ y_train
|
|
|
|
+ y_valid
|
|
|
|
+ """
|
|
|
|
+ col_time, features, target = opt.col_time, opt.features, opt.target
|
|
|
|
+ # 清洗处理好的限电记录
|
|
|
|
+ if 'is_limit' in data.columns:
|
|
|
|
+ data = data[data['is_limit'] == False]
|
|
|
|
+ # 筛选特征,数值化
|
|
|
|
+ train_data = data[[col_time] + features + [target]]
|
|
|
|
+ # 清洗特征平均缺失率大于20%的天
|
|
|
|
+ # train_data = missing_features(train_data, features, col_time)
|
|
|
|
+ train_data = train_data.sort_values(by=col_time)
|
|
|
|
+ # train_data = train_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
|
|
|
|
+ # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
|
|
|
|
+ train_data_cleaned = key_field_row_cleaning(train_data, features + [target], self.logger)
|
|
|
|
+ train_data_cleaned = train_data_cleaned.applymap(
|
|
|
|
+ lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
|
|
|
|
+ # 创建特征和目标的标准化器
|
|
|
|
+ train_scaler = MinMaxScaler(feature_range=(0, 1))
|
|
|
|
+ target_scaler = MinMaxScaler(feature_range=(0, 1))
|
|
|
|
+ # 标准化特征和目标
|
|
|
|
+ scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features])
|
|
|
|
+ scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
|
|
|
|
+ train_data_cleaned[features] = scaled_train_data
|
|
|
|
+ train_data_cleaned[[target]] = scaled_target
|
|
|
|
+
|
|
|
|
+ train_datas = self.fill_train_data(train_data_cleaned, col_time)
|
|
|
|
+ # 保存两个scaler
|
|
|
|
+ scaled_train_bytes = BytesIO()
|
|
|
|
+ scaled_target_bytes = BytesIO()
|
|
|
|
+
|
|
|
|
+ joblib.dump(train_scaler, scaled_train_bytes)
|
|
|
|
+ joblib.dump(target_scaler, scaled_target_bytes)
|
|
|
|
+ scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
|
|
|
|
+ scaled_target_bytes.seek(0)
|
|
|
|
+
|
|
|
|
+ if bp_data:
|
|
|
|
+ train_data = pd.concat(train_datas, axis=0)
|
|
|
|
+ train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data, scaled_target, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
|
|
|
|
+ else:
|
|
|
|
+ train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
|
|
|
|
+ return train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes
|
|
|
|
+
|
|
|
|
+ def pre_data_handler(self, data, feature_scaler, opt, bp_data=False):
|
|
|
|
+ """
|
|
|
|
+ 预测数据简单处理
|
|
|
|
+ Args:
|
|
|
|
+ data: 从mongo中加载的数据
|
|
|
|
+ opt:参数命名空间
|
|
|
|
+ return:
|
|
|
|
+ scaled_features: 反归一化的特征
|
|
|
|
+ """
|
|
|
|
+ if 'is_limit' in data.columns:
|
|
|
|
+ data = data[data['is_limit'] == False]
|
|
|
|
+ # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
|
|
|
|
+ # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
|
|
|
|
+ col_time, features = opt.col_time, opt.features
|
|
|
|
+ pre_data = data.sort_values(by=col_time)[features]
|
|
|
|
+ scaled_features = feature_scaler.transform(pre_data[features])
|
|
|
|
+ pre_data[features] = scaled_features
|
|
|
|
+ if bp_data:
|
|
|
|
+ pre_x = self.get_predict_data([pre_data], features)
|
|
|
|
+ else:
|
|
|
|
+ pre_x = pre_data.values
|
|
|
|
+ return pre_x
|