data_handler.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_handler.py
  4. # @Time :2025/1/8 14:56
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import argparse, numbers, joblib
  8. import numpy as np
  9. import pandas as pd
  10. from io import BytesIO
  11. from bson.decimal128 import Decimal128
  12. from sklearn.preprocessing import MinMaxScaler
  13. from common.processing_data_common import missing_features, str_to_list
  14. from common.data_cleaning import *
  15. class DataHandler(object):
  16. def __init__(self, logger, args):
  17. self.logger = logger
  18. self.opt = argparse.Namespace(**args)
  19. def get_train_data(self, dfs, col_time, target, time_series=1, lstm_type=1):
  20. train_x, valid_x, train_y, valid_y = [], [], [], []
  21. for i, df in enumerate(dfs, start=1):
  22. if len(df) < self.opt.Model["time_step"]:
  23. self.logger.info("特征处理-训练数据-不满足time_step")
  24. if lstm_type == 2:
  25. datax, datay = self.get_timestep_features_lstm2(df, col_time, target, is_train=True, time_series=time_series)
  26. elif lstm_type == 3:
  27. datax, datay = self.get_timestep_features_bilstm(df, col_time, target, is_train=True, time_series=time_series)
  28. else:
  29. datax, datay = self.get_timestep_features(df, col_time, target, is_train=True, time_series=time_series)
  30. if len(datax) < 10:
  31. self.logger.info("特征处理-训练数据-无法进行最小分割")
  32. continue
  33. tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  34. train_x.extend(tx)
  35. valid_x.extend(vx)
  36. train_y.extend(ty)
  37. valid_y.extend(vy)
  38. train_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in train_y]]), axis=0)
  39. valid_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in valid_y]]), axis=0)
  40. train_x = np.array([x.values for x in train_x])
  41. valid_x = np.array([x.values for x in valid_x])
  42. return train_x, valid_x, train_y, valid_y
  43. def get_predict_data(self, dfs, time_series=1, lstm_type=1):
  44. test_x = []
  45. for i, df in enumerate(dfs, start=1):
  46. if len(df) < self.opt.Model["time_step"]*time_series:
  47. self.logger.info("特征处理-预测数据-不满足time_step")
  48. continue
  49. if lstm_type == 2:
  50. datax = self.get_predict_features2(df, time_series)
  51. elif lstm_type == 3:
  52. datax = self.get_predict_features3(df, time_series)
  53. else:
  54. datax = self.get_predict_features(df, time_series)
  55. test_x.append(datax)
  56. test_x = np.concatenate(test_x, axis=0)
  57. return test_x
  58. def get_predict_features(self, norm_data, time_series=1):
  59. """
  60. 均分数据,获取预测数据集
  61. """
  62. time_step = self.opt.Model["time_step"]
  63. feature_data = norm_data.reset_index(drop=True)
  64. time_step *= int(time_series)
  65. time_step_loc = time_step - 1
  66. iters = int(len(feature_data)) // time_step
  67. end = int(len(feature_data)) % time_step
  68. features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)])
  69. if end > 0:
  70. df = feature_data.tail(end)
  71. df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[-1]]* (time_step-end))]).reset_index(drop=True)
  72. features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
  73. return features_x
  74. def get_predict_features2(self, norm_data, time_series=2):
  75. """
  76. 时序2:以时间步长切分数据,得到切分原子段,根据所需的时序原子段得到预测数据集
  77. """
  78. time_step = self.opt.Model["time_step"]
  79. feature_data = norm_data.reset_index(drop=True)
  80. time_step_loc = time_step*time_series - 1
  81. iters = int(len(feature_data)) // time_step
  82. iters = iters - (time_series - 1)
  83. end = int(len(feature_data)) % time_step
  84. features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(0, iters)])
  85. if end > 0:
  86. df = norm_data.tail(end)
  87. df_repeated = pd.concat([norm_data.tail((time_series-1)*time_step)] + [df] + [df.tail(1)] * (time_step - end)).reset_index(drop=True)
  88. features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
  89. return features_x
  90. def get_predict_features3(self, norm_data, time_series=3):
  91. """
  92. 时序3:以时间步长切分数据,得到切分原子段,根据所需的时序原子段得到预测数据集
  93. """
  94. time_step = self.opt.Model["time_step"]
  95. feature_data = norm_data.reset_index(drop=True)
  96. time_step_loc = time_step*time_series - 1
  97. features_x = np.array([x for x in [feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(0, len(norm_data), (time_series-2)*time_step)] if not len(x) < time_step*time_series])
  98. end = norm_data.tail(len(feature_data) - (features_x.shape[0] * time_step) - time_step).reset_index(drop=True)
  99. num = len(end) // ((time_series - 2) * time_step)
  100. bu = len(end) % ((time_series - 2) * time_step)
  101. front = norm_data.tail(time_step)
  102. back = norm_data.tail(time_step)
  103. df_repeated = [pd.concat([front]+[end.iloc[i*time_step:i*time_step + time_step].reset_index(drop=True)]+[back]) for i in range(0, num)]
  104. if bu > 0:
  105. last = pd.concat([front] + [end.tail(bu)] + [end.tail(1)] * (time_step - bu) + [back])
  106. df_repeated = np.array(df_repeated + [last])
  107. else:
  108. df_repeated = np.array(df_repeated)
  109. features_x = np.concatenate((features_x, df_repeated), axis=0)
  110. return features_x
  111. def get_timestep_features(self, norm_data, col_time, target, is_train, time_series=1):
  112. """
  113. 步长分割数据,获取时序训练集
  114. """
  115. time_step = self.opt.Model["time_step"]
  116. feature_data = norm_data.reset_index(drop=True)
  117. time_step_loc = time_step*time_series - 1
  118. train_num = int(len(feature_data))
  119. label_features = [col_time, target] if is_train is True else [col_time, target]
  120. nwp_cs = self.opt.features
  121. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] # 数据库字段 'C_T': 'C_WS170'
  122. labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
  123. features_x, features_y = [], []
  124. for i, row in enumerate(zip(nwp, labels)):
  125. features_x.append(row[0])
  126. features_y.append(row[1])
  127. return features_x, features_y
  128. def get_timestep_features_lstm2(self, norm_data, col_time, target, is_train, time_series=2):
  129. """
  130. 步长分割数据,获取最后一个时间步长作为训练集
  131. """
  132. time_step = self.opt.Model["time_step"]
  133. feature_data = norm_data.reset_index(drop=True)
  134. time_step_loc = time_step*time_series - 1
  135. train_num = int(len(feature_data))
  136. label_features = [col_time, target] if is_train is True else [col_time, target]
  137. nwp_cs = self.opt.features
  138. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] # 数据库字段 'C_T': 'C_WS170'
  139. labels = [feature_data.loc[i+time_step_loc-time_step+1: i+time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
  140. features_x, features_y = [], []
  141. for i, row in enumerate(zip(nwp, labels)):
  142. features_x.append(row[0])
  143. features_y.append(row[1])
  144. return features_x, features_y
  145. def get_timestep_features_bilstm(self, norm_data, col_time, target, is_train, time_series=3):
  146. """
  147. 步长分割数据,获取中间的时间步长作为训练集
  148. """
  149. time_step = self.opt.Model["time_step"]
  150. feature_data = norm_data.reset_index(drop=True)
  151. time_step_loc = time_step*time_series - 1
  152. train_num = int(len(feature_data))
  153. label_features = [col_time, target] if is_train is True else [col_time, target]
  154. nwp_cs = self.opt.features
  155. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] # 数据库字段 'C_T': 'C_WS170'
  156. labels = [feature_data.loc[i+time_step: i+time_step_loc-time_step, label_features].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
  157. features_x, features_y = [], []
  158. for i, row in enumerate(zip(nwp, labels)):
  159. features_x.append(row[0])
  160. features_y.append(row[1])
  161. return features_x, features_y
  162. def fill_train_data(self, unite, col_time):
  163. """
  164. 补值
  165. """
  166. unite[col_time] = pd.to_datetime(unite[col_time])
  167. unite['time_diff'] = unite[col_time].diff()
  168. dt_short = pd.Timedelta(minutes=15)
  169. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  170. data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
  171. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  172. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  173. self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
  174. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  175. data_train = self.data_fill(data_train, col_time)
  176. return data_train
  177. def fill_pre_data(self, unite):
  178. unite = unite.interpolate(method='linear') # nwp先进行线性填充
  179. unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充
  180. return unite
  181. def missing_time_splite(self, df, dt_short, dt_long, col_time):
  182. df.reset_index(drop=True, inplace=True)
  183. n_long, n_short, n_points = 0, 0, 0
  184. start_index = 0
  185. dfs = []
  186. for i in range(1, len(df)):
  187. if df['time_diff'][i] >= dt_long:
  188. df_long = df.iloc[start_index:i, :-1]
  189. dfs.append(df_long)
  190. start_index = i
  191. n_long += 1
  192. if df['time_diff'][i] > dt_short:
  193. self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
  194. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  195. self.logger.info("缺失点数:{}".format(points))
  196. if df['time_diff'][i] < dt_long:
  197. n_short += 1
  198. n_points += points
  199. self.logger.info("需要补值的点数:{}".format(points))
  200. dfs.append(df.iloc[start_index:, :-1])
  201. self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  202. self.logger.info("需要补值的总点数:{}".format(n_points))
  203. return dfs
  204. def data_fill(self, dfs, col_time, test=False):
  205. dfs_fill, inserts = [], 0
  206. for i, df in enumerate(dfs):
  207. df = rm_duplicated(df, self.logger)
  208. df1 = df.set_index(col_time, inplace=False)
  209. dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
  210. dff.reset_index(inplace=True)
  211. points = len(dff) - len(df1)
  212. dfs_fill.append(dff)
  213. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  214. inserts += points
  215. name = "预测数据" if test is True else "训练集"
  216. self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
  217. return dfs_fill
  218. def train_valid_split(self, datax, datay, valid_rate, shuffle):
  219. shuffle_index = np.random.permutation(len(datax))
  220. indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
  221. valid_size = int(len(datax) * valid_rate)
  222. valid_index = indexs[-valid_size:]
  223. train_index = indexs[:-valid_size]
  224. tx, vx, ty, vy = [], [], [], []
  225. for i, data in enumerate(zip(datax, datay)):
  226. if i in train_index:
  227. tx.append(data[0])
  228. ty.append(data[1])
  229. elif i in valid_index:
  230. vx.append(data[0])
  231. vy.append(data[1])
  232. return tx, vx, ty, vy
  233. def train_data_handler(self, data, bp_data=False, time_series=1, lstm_type=1):
  234. """
  235. 训练数据预处理:
  236. 清洗+补值+归一化
  237. Args:
  238. data: 从mongo中加载的数据
  239. opt:参数命名空间
  240. return:
  241. x_train
  242. x_valid
  243. y_train
  244. y_valid
  245. """
  246. col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
  247. # 清洗限电记录
  248. if 'is_limit' in data.columns:
  249. data = data[data['is_limit'] == False]
  250. # 筛选特征,数值化,排序
  251. train_data = data[[col_time] + features + [target]]
  252. train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  253. train_data = train_data.sort_values(by=col_time)
  254. # 清洗特征平均缺失率大于20%的天
  255. # train_data = missing_features(train_data, features, col_time)
  256. # 对清洗完限电的数据进行特征预处理:
  257. # 1.空值异常值清洗
  258. train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
  259. self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
  260. # 2. 标准化
  261. # 创建特征和目标的标准化器
  262. train_scaler = MinMaxScaler(feature_range=(0, 1))
  263. target_scaler = MinMaxScaler(feature_range=(0, 1))
  264. # 标准化特征和目标
  265. scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features])
  266. scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
  267. scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0]
  268. train_data_cleaned[self.opt.features] = scaled_train_data
  269. train_data_cleaned[[target]] = scaled_target
  270. # 3.缺值补值
  271. train_datas = self.fill_train_data(train_data_cleaned, col_time)
  272. # 保存两个scaler
  273. scaled_train_bytes = BytesIO()
  274. scaled_target_bytes = BytesIO()
  275. joblib.dump(train_scaler, scaled_train_bytes)
  276. joblib.dump(target_scaler, scaled_target_bytes)
  277. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  278. scaled_target_bytes.seek(0)
  279. if bp_data:
  280. train_data = pd.concat(train_datas, axis=0)
  281. train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[self.opt.features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  282. train_x, valid_x, train_y, valid_y = np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
  283. else:
  284. train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target, time_series, lstm_type)
  285. return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
  286. def pre_data_handler(self, data, feature_scaler, bp_data=False, time_series=1, lstm_type=1):
  287. """
  288. 预测数据简单处理
  289. Args:
  290. data: 从mongo中加载的数据
  291. opt:参数命名空间
  292. return:
  293. scaled_features: 反归一化的特征
  294. """
  295. # 清洗限电记录
  296. if 'is_limit' in data.columns:
  297. data = data[data['is_limit'] == False]
  298. # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
  299. # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
  300. col_time, features = self.opt.col_time, self.opt.features
  301. data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  302. data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
  303. if not set(features).issubset(set(data.columns.tolist())):
  304. raise ValueError("预测数据特征不满足模型特征!")
  305. pre_data = data[features].copy()
  306. if self.opt.Model['predict_data_fill']:
  307. pre_data = self.fill_pre_data(pre_data)
  308. pre_data.loc[:, features] = feature_scaler.transform(pre_data)
  309. if bp_data:
  310. pre_x = np.array(pre_data)
  311. else:
  312. pre_x = self.get_predict_data([pre_data], time_series, lstm_type)
  313. return pre_x, data