data_handler.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_handler.py
  4. # @Time :2025/1/8 14:56
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import argparse, numbers, joblib
  8. import numpy as np
  9. import pandas as pd
  10. from io import BytesIO
  11. from bson.decimal128 import Decimal128
  12. from sklearn.preprocessing import MinMaxScaler
  13. from common.processing_data_common import missing_features, str_to_list
  14. from common.data_cleaning import *
  15. class DataHandler(object):
  16. def __init__(self, logger, args):
  17. self.logger = logger
  18. self.opt = argparse.Namespace(**args)
  19. def get_train_data(self, dfs, col_time, features, target):
  20. train_x, valid_x, train_y, valid_y = [], [], [], []
  21. for i, df in enumerate(dfs, start=1):
  22. if len(df) < self.opt.Model["time_step"]:
  23. self.logger.info("特征处理-训练数据-不满足time_step")
  24. datax, datay = self.get_timestep_features(df, col_time, features, target, is_train=True)
  25. if len(datax) < 10:
  26. self.logger.info("特征处理-训练数据-无法进行最小分割")
  27. continue
  28. tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  29. train_x.extend(tx)
  30. valid_x.extend(vx)
  31. train_y.extend(ty)
  32. valid_y.extend(vy)
  33. train_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in train_y]]), axis=0)
  34. valid_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in valid_y]]), axis=0)
  35. train_x = np.array([x.values for x in train_x])
  36. valid_x = np.array([x.values for x in valid_x])
  37. return train_x, valid_x, train_y, valid_y
  38. def get_predict_data(self, dfs, features):
  39. test_x = []
  40. for i, df in enumerate(dfs, start=1):
  41. if len(df) < self.opt.Model["time_step"]:
  42. self.logger.info("特征处理-预测数据-不满足time_step")
  43. continue
  44. datax = self.get_predict_features(df, features)
  45. test_x.append(datax)
  46. test_x = np.concatenate(test_x, axis=0)
  47. return test_x
  48. def get_predict_features(self, norm_data, features):
  49. """
  50. 均分数据,获取预测数据集
  51. """
  52. time_step = self.opt.Model["time_step"]
  53. feature_data = norm_data.reset_index(drop=True)
  54. time_step_loc = time_step - 1
  55. iters = int(len(feature_data)) // self.opt.Model['time_step']
  56. end = int(len(feature_data)) % self.opt.Model['time_step']
  57. features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, features].reset_index(drop=True) for i in range(iters)])
  58. if end > 0:
  59. df = feature_data.tail(end)
  60. df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True)
  61. features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
  62. return features_x
  63. def get_timestep_features(self, norm_data, col_time, features, target, is_train):
  64. """
  65. 步长分割数据,获取时序训练集
  66. """
  67. time_step = self.opt.Model["time_step"]
  68. feature_data = norm_data.reset_index(drop=True)
  69. time_step_loc = time_step - 1
  70. train_num = int(len(feature_data))
  71. label_features = [col_time, target] if is_train is True else [col_time, target]
  72. nwp_cs = features
  73. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
  74. labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
  75. features_x, features_y = [], []
  76. for i, row in enumerate(zip(nwp, labels)):
  77. features_x.append(row[0])
  78. features_y.append(row[1])
  79. return features_x, features_y
  80. def fill_train_data(self, unite, col_time):
  81. """
  82. 补值
  83. """
  84. unite[col_time] = pd.to_datetime(unite[col_time])
  85. unite['time_diff'] = unite[col_time].diff()
  86. dt_short = pd.Timedelta(minutes=15)
  87. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  88. data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
  89. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  90. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  91. self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
  92. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  93. data_train = self.data_fill(data_train, col_time)
  94. return data_train
  95. def fill_pre_data(self, unite):
  96. unite = unite.interpolate(method='linear') # nwp先进行线性填充
  97. unite = unite.fillna(method='ffill') # 再对超过采样边缘无法填充的点进行二次填充
  98. unite = unite.fillna(method='bfill')
  99. return unite
  100. def missing_time_splite(self, df, dt_short, dt_long, col_time):
  101. df.reset_index(drop=True, inplace=True)
  102. n_long, n_short, n_points = 0, 0, 0
  103. start_index = 0
  104. dfs = []
  105. for i in range(1, len(df)):
  106. if df['time_diff'][i] >= dt_long:
  107. df_long = df.iloc[start_index:i, :-1]
  108. dfs.append(df_long)
  109. start_index = i
  110. n_long += 1
  111. if df['time_diff'][i] > dt_short:
  112. self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
  113. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  114. self.logger.info("缺失点数:{}".format(points))
  115. if df['time_diff'][i] < dt_long:
  116. n_short += 1
  117. n_points += points
  118. self.logger.info("需要补值的点数:{}".format(points))
  119. dfs.append(df.iloc[start_index:, :-1])
  120. self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  121. self.logger.info("需要补值的总点数:{}".format(n_points))
  122. return dfs
  123. def data_fill(self, dfs, col_time, test=False):
  124. dfs_fill, inserts = [], 0
  125. for i, df in enumerate(dfs):
  126. df = rm_duplicated(df, self.logger)
  127. df1 = df.set_index(col_time, inplace=False)
  128. dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
  129. dff.reset_index(inplace=True)
  130. points = len(dff) - len(df1)
  131. dfs_fill.append(dff)
  132. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  133. inserts += points
  134. name = "预测数据" if test is True else "训练集"
  135. self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
  136. return dfs_fill
  137. def train_valid_split(self, datax, datay, valid_rate, shuffle):
  138. shuffle_index = np.random.permutation(len(datax))
  139. indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
  140. valid_size = int(len(datax) * valid_rate)
  141. valid_index = indexs[-valid_size:]
  142. train_index = indexs[:-valid_size]
  143. tx, vx, ty, vy = [], [], [], []
  144. for i, data in enumerate(zip(datax, datay)):
  145. if i in train_index:
  146. tx.append(data[0])
  147. ty.append(data[1])
  148. elif i in valid_index:
  149. vx.append(data[0])
  150. vy.append(data[1])
  151. return tx, vx, ty, vy
  152. def train_data_handler(self, data, bp_data=False):
  153. """
  154. 训练数据预处理:
  155. 清洗+补值+归一化
  156. Args:
  157. data: 从mongo中加载的数据
  158. opt:参数命名空间
  159. return:
  160. x_train
  161. x_valid
  162. y_train
  163. y_valid
  164. """
  165. col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
  166. # 清洗限电记录
  167. if 'is_limit' in data.columns:
  168. data = data[data['is_limit'] == False]
  169. # 筛选特征,数值化,排序
  170. train_data = data[[col_time] + features + [target]]
  171. train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  172. train_data = train_data.sort_values(by=col_time)
  173. # 清洗特征平均缺失率大于20%的天
  174. # train_data = missing_features(train_data, features, col_time)
  175. # 对清洗完限电的数据进行特征预处理:
  176. # 1.空值异常值清洗
  177. train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
  178. self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
  179. # 2. 标准化
  180. # 创建特征和目标的标准化器
  181. train_scaler = MinMaxScaler(feature_range=(0, 1))
  182. target_scaler = MinMaxScaler(feature_range=(0, 1))
  183. # 标准化特征和目标
  184. scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features])
  185. scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
  186. scaled_cap = target_scaler.transform(np.array([[self.opt.cap]]))[0,0]
  187. train_data_cleaned[features] = scaled_train_data
  188. train_data_cleaned[[target]] = scaled_target
  189. # 3.缺值补值
  190. train_datas = self.fill_train_data(train_data_cleaned, col_time)
  191. # 保存两个scaler
  192. scaled_train_bytes = BytesIO()
  193. scaled_target_bytes = BytesIO()
  194. joblib.dump(train_scaler, scaled_train_bytes)
  195. joblib.dump(target_scaler, scaled_target_bytes)
  196. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  197. scaled_target_bytes.seek(0)
  198. if bp_data:
  199. train_data = pd.concat(train_datas, axis=0)
  200. train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  201. train_x, valid_x, train_y, valid_y = np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
  202. else:
  203. train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
  204. return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
  205. def pre_data_handler(self, data, feature_scaler, bp_data=False):
  206. """
  207. 预测数据简单处理
  208. Args:
  209. data: 从mongo中加载的数据
  210. opt:参数命名空间
  211. return:
  212. scaled_features: 反归一化的特征
  213. """
  214. # 清洗限电记录
  215. if 'is_limit' in data.columns:
  216. data = data[data['is_limit'] == False]
  217. # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
  218. # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
  219. col_time, features = self.opt.col_time, self.opt.features
  220. data = data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  221. data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
  222. if self.opt.Model['predict_data_fill']:
  223. data = self.fill_pre_data(data)
  224. pre_data = data[features]
  225. scaled_features = feature_scaler.transform(data[features])
  226. pre_data.loc[:, features] = scaled_features
  227. if bp_data:
  228. pre_x = np.array(pre_data)
  229. else:
  230. pre_x = self.get_predict_data([pre_data], features)
  231. return pre_x, data