custom_data_handler.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :custom_data_handler.py
  4. # @Time :2025/05/10 16:06
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import argparse, numbers, joblib
  8. from unittest.mock import inplace
  9. import numpy as np
  10. import pandas as pd
  11. from io import BytesIO
  12. from bson.decimal128 import Decimal128
  13. from sklearn.preprocessing import MinMaxScaler
  14. from sqlalchemy.dialects.mssql.information_schema import columns
  15. from common.processing_data_common import missing_features, str_to_list
  16. from common.data_cleaning import *
  17. class CustomDataHandler(object):
  18. def __init__(self, logger, args):
  19. self.logger = logger
  20. self.opt = argparse.Namespace(**args)
  21. def get_train_data(self, dfs, col_time, target, time_series=1):
  22. train_x, valid_x, train_y, valid_y = [], [], [], []
  23. for i, df in enumerate(dfs, start=1):
  24. if len(df) < self.opt.Model["time_step"]:
  25. self.logger.info("特征处理-训练数据-不满足time_step")
  26. datax, datay = self.get_timestep_features_zone(df, col_time, target, is_train=True, time_series=time_series)
  27. if len(datax) < 10:
  28. self.logger.info("特征处理-训练数据-无法进行最小分割")
  29. continue
  30. tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  31. train_x.extend(tx)
  32. valid_x.extend(vx)
  33. train_y.extend(ty)
  34. valid_y.extend(vy)
  35. train_y = [np.array([y[0].values for y in train_y]), np.array([y[1].iloc[:, 1].values for y in train_y])]
  36. valid_y = [np.array([y[0].values for y in valid_y]), np.array([y[1].iloc[:, 1].values for y in valid_y])]
  37. train_x = np.array([x.values for x in train_x])
  38. valid_x = np.array([x.values for x in valid_x])
  39. return train_x, valid_x, train_y, valid_y
  40. def get_predict_data(self, dfs, time_series=1):
  41. test_x = []
  42. for i, df in enumerate(dfs, start=1):
  43. if len(df) < self.opt.Model["time_step"]*time_series:
  44. self.logger.info("特征处理-预测数据-不满足time_step")
  45. continue
  46. datax = self.get_predict_features(df, time_series)
  47. test_x.append(datax)
  48. test_x = np.concatenate(test_x, axis=0)
  49. return test_x
  50. def get_predict_features(self, norm_data, time_series=1):
  51. """
  52. 均分数据,获取预测数据集
  53. """
  54. time_step = self.opt.Model["time_step"]
  55. feature_data = norm_data.loc[:, self.opt.features].reset_index(drop=True)
  56. time_step *= int(time_series)
  57. time_step_loc = time_step - 1
  58. iters = int(len(feature_data)) // time_step
  59. end = int(len(feature_data)) % time_step
  60. features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)])
  61. if end > 0:
  62. df = feature_data.tail(end)
  63. df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[-1]]* (time_step-end))]).reset_index(drop=True)
  64. features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
  65. return features_x
  66. def get_timestep_features(self, norm_data, col_time, target, is_train, time_series=1):
  67. """
  68. 步长分割数据,获取时序训练集
  69. """
  70. time_step = self.opt.Model["time_step"]
  71. feature_data = norm_data.reset_index(drop=True)
  72. time_step_loc = time_step*time_series - 1
  73. train_num = int(len(feature_data))
  74. label_features = [col_time, target] if is_train is True else [col_time, target]
  75. nwp_cs = self.opt.features
  76. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)] # 数据库字段 'C_T': 'C_WS170'
  77. labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
  78. features_x, features_y = [], []
  79. for i, row in enumerate(zip(nwp, labels)):
  80. features_x.append(row[0])
  81. features_y.append(row[1])
  82. return features_x, features_y
  83. def get_timestep_features_zone(self, norm_data, col_time, target, is_train, time_series):
  84. """
  85. 步长分割数据,分区建模
  86. """
  87. time_step = self.opt.Model["time_step"]
  88. feature_data = norm_data.reset_index(drop=True)
  89. time_step_loc = time_step*time_series - 1
  90. train_num = int(len(feature_data))
  91. label_features_power = [col_time, target] if is_train is True else [col_time, target]
  92. label_features_zone = self.opt.zone
  93. nwp_cs = [x for x in self.opt.features if x not in self.opt.zone]
  94. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
  95. labels_power = [feature_data.loc[i:i + time_step_loc, label_features_power].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
  96. labels_zone = [feature_data.loc[i:i + time_step_loc, label_features_zone].reset_index(drop=True) for i in range(train_num - time_step*time_series + 1)]
  97. features_x, features_y = [], []
  98. for i, row in enumerate(zip(nwp, labels_power, labels_zone)):
  99. features_x.append(row[0])
  100. features_y.append([row[2], row[1]])
  101. return features_x, features_y
  102. def fill_train_data(self, unite, col_time):
  103. """
  104. 补值
  105. """
  106. unite[col_time] = pd.to_datetime(unite[col_time])
  107. unite['time_diff'] = unite[col_time].diff()
  108. dt_short = pd.Timedelta(minutes=15)
  109. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  110. data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
  111. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  112. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  113. self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
  114. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  115. data_train = self.data_fill(data_train, col_time)
  116. return data_train
  117. def fill_pre_data(self, unite):
  118. unite = unite.interpolate(method='linear') # nwp先进行线性填充
  119. unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充
  120. return unite
  121. def missing_time_splite(self, df, dt_short, dt_long, col_time):
  122. df.reset_index(drop=True, inplace=True)
  123. n_long, n_short, n_points = 0, 0, 0
  124. start_index = 0
  125. dfs = []
  126. for i in range(1, len(df)):
  127. if df['time_diff'][i] >= dt_long:
  128. df_long = df.iloc[start_index:i, :-1]
  129. dfs.append(df_long)
  130. start_index = i
  131. n_long += 1
  132. if df['time_diff'][i] > dt_short:
  133. self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
  134. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  135. self.logger.info("缺失点数:{}".format(points))
  136. if df['time_diff'][i] < dt_long:
  137. n_short += 1
  138. n_points += points
  139. self.logger.info("需要补值的点数:{}".format(points))
  140. dfs.append(df.iloc[start_index:, :-1])
  141. self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  142. self.logger.info("需要补值的总点数:{}".format(n_points))
  143. return dfs
  144. def data_fill(self, dfs, col_time, test=False):
  145. dfs_fill, inserts = [], 0
  146. for i, df in enumerate(dfs):
  147. df = rm_duplicated(df, self.logger)
  148. df1 = df.set_index(col_time, inplace=False)
  149. dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
  150. dff.reset_index(inplace=True)
  151. points = len(dff) - len(df1)
  152. dfs_fill.append(dff)
  153. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  154. inserts += points
  155. name = "预测数据" if test is True else "训练集"
  156. self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
  157. return dfs_fill
  158. def train_valid_split(self, datax, datay, valid_rate, shuffle):
  159. shuffle_index = np.random.permutation(len(datax))
  160. indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
  161. valid_size = int(len(datax) * valid_rate)
  162. valid_index = indexs[-valid_size:]
  163. train_index = indexs[:-valid_size]
  164. tx, vx, ty, vy = [], [], [], []
  165. for i, data in enumerate(zip(datax, datay)):
  166. if i in train_index:
  167. tx.append(data[0])
  168. ty.append(data[1])
  169. elif i in valid_index:
  170. vx.append(data[0])
  171. vy.append(data[1])
  172. return tx, vx, ty, vy
  173. def train_data_handler(self, data, time_series=1):
  174. """
  175. 训练数据预处理:
  176. 清洗+补值+归一化
  177. Args:
  178. data: 从mongo中加载的数据
  179. opt:参数命名空间
  180. return:
  181. x_train
  182. x_valid
  183. y_train
  184. y_valid
  185. """
  186. col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
  187. # 清洗限电记录
  188. if 'is_limit' in data.columns:
  189. data = data[data['is_limit'] == False]
  190. # 筛选特征,数值化,排序
  191. train_data = data[[col_time] + features + [target] + self.opt.zone]
  192. train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  193. train_data = train_data.sort_values(by=col_time)
  194. # 清洗特征平均缺失率大于20%的天
  195. # train_data = missing_features(train_data, features, col_time)
  196. # 对清洗完限电的数据进行特征预处理:
  197. # 1.空值异常值清洗
  198. train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
  199. self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
  200. # 2. 标准化
  201. # 创建特征和目标的标准化器
  202. train_scaler = MinMaxScaler(feature_range=(0, 1))
  203. target_scaler = MinMaxScaler(feature_range=(0, 1))
  204. # 标准化特征和目标
  205. scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features+self.opt.zone])
  206. scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
  207. scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0]
  208. train_data_cleaned[self.opt.features+self.opt.zone] = scaled_train_data
  209. train_data_cleaned[[target]] = scaled_target
  210. # 3.缺值补值
  211. train_datas = self.fill_train_data(train_data_cleaned, col_time)
  212. # 保存两个scaler
  213. scaled_train_bytes = BytesIO()
  214. scaled_target_bytes = BytesIO()
  215. joblib.dump(train_scaler, scaled_train_bytes)
  216. joblib.dump(target_scaler, scaled_target_bytes)
  217. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  218. scaled_target_bytes.seek(0)
  219. train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target, time_series)
  220. return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
  221. def pre_data_handler(self, data, feature_scaler, time_series=1):
  222. """
  223. 预测数据简单处理
  224. Args:
  225. data: 从mongo中加载的数据
  226. opt:参数命名空间
  227. return:
  228. scaled_features: 反归一化的特征
  229. """
  230. # 清洗限电记录
  231. if 'is_limit' in data.columns:
  232. data = data[data['is_limit'] == False]
  233. # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
  234. # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
  235. col_time, features = self.opt.col_time, self.opt.features
  236. data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  237. data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
  238. if not set(features).issubset(set(data.columns.tolist())):
  239. raise ValueError("预测数据特征不满足模型特征!")
  240. pre_data = data[features].copy()
  241. pre_data[self.opt.zone] = 1
  242. if self.opt.Model['predict_data_fill']:
  243. pre_data = self.fill_pre_data(pre_data)
  244. scaled_pre_data = feature_scaler.transform(pre_data)[:, :len(features)]
  245. pre_data.drop(columns=self.opt.zone, inplace=True)
  246. pre_data.loc[:, features] = scaled_pre_data
  247. pre_x = self.get_predict_data([pre_data], time_series)
  248. return pre_x, data