data_handler.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_handler.py
  4. # @Time :2025/1/8 14:56
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import argparse, numbers, joblib
  8. import pandas as pd
  9. from io import BytesIO
  10. from bson.decimal128 import Decimal128
  11. from sklearn.preprocessing import MinMaxScaler
  12. from common.processing_data_common import missing_features, str_to_list
  13. from common.data_cleaning import *
  14. class DataHandler(object):
  15. def __init__(self, logger, args):
  16. self.logger = logger
  17. self.opt = argparse.Namespace(**args)
  18. def get_train_data(self, dfs, col_time, features, target):
  19. train_x, valid_x, train_y, valid_y = [], [], [], []
  20. for i, df in enumerate(dfs, start=1):
  21. if len(df) < self.opt.Model["time_step"]:
  22. self.logger.info("特征处理-训练数据-不满足time_step")
  23. datax, datay = self.get_timestep_features(df, col_time, features, target, is_train=True)
  24. if len(datax) < 10:
  25. self.logger.info("特征处理-训练数据-无法进行最小分割")
  26. continue
  27. tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  28. train_x.extend(tx)
  29. valid_x.extend(vx)
  30. train_y.extend(ty)
  31. valid_y.extend(vy)
  32. train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0)
  33. valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0)
  34. train_x = np.array([x.values for x in train_x])
  35. valid_x = np.array([x.values for x in valid_x])
  36. # train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
  37. # valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
  38. return train_x, valid_x, train_y, valid_y
  39. def get_predict_data(self, dfs, features):
  40. test_x = []
  41. for i, df in enumerate(dfs, start=1):
  42. if len(df) < self.opt.Model["time_step"]:
  43. self.logger.info("特征处理-预测数据-不满足time_step")
  44. continue
  45. datax = self.get_predict_features(df, features)
  46. test_x.extend(datax)
  47. test_x = [np.array([x[0].values for x in test_x]), np.array([x[1].values for x in test_x])]
  48. return test_x
  49. def get_predict_features(self, norm_data, features):
  50. """
  51. 均分数据,获取预测数据集
  52. """
  53. time_step = self.opt.Model["time_step"]
  54. feature_data = norm_data.reset_index(drop=True)
  55. time_step_loc = time_step - 1
  56. iters = int(len(feature_data)) / self.opt.Model['time_step']
  57. features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, features].reset_index(drop=True) for i in range(iters)])
  58. return features_x
  59. def get_timestep_features(self, norm_data, col_time, features, target, is_train):
  60. """
  61. 步长分割数据,获取时序训练集
  62. """
  63. time_step = self.opt.Model["time_step"]
  64. feature_data = norm_data.reset_index(drop=True)
  65. time_step_loc = time_step - 1
  66. train_num = int(len(feature_data))
  67. label_features = [col_time, target] if is_train is True else [col_time, target]
  68. nwp_cs = features
  69. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
  70. labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
  71. features_x, features_y = [], []
  72. for i, row in enumerate(zip(nwp, labels)):
  73. features_x.append(row[0])
  74. features_y.append(row[1])
  75. return features_x, features_y
  76. def fill_train_data(self, unite, col_time):
  77. """
  78. 补值
  79. """
  80. unite[col_time] = pd.to_datetime(unite[col_time])
  81. unite['time_diff'] = unite[col_time].diff()
  82. dt_short = pd.Timedelta(minutes=15)
  83. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  84. data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
  85. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  86. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  87. self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
  88. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  89. data_train = self.data_fill(data_train, col_time)
  90. return data_train
  91. def missing_time_splite(self, df, dt_short, dt_long, col_time):
  92. df.reset_index(drop=True, inplace=True)
  93. n_long, n_short, n_points = 0, 0, 0
  94. start_index = 0
  95. dfs = []
  96. for i in range(1, len(df)):
  97. if df['time_diff'][i] >= dt_long:
  98. df_long = df.iloc[start_index:i, :-1]
  99. dfs.append(df_long)
  100. start_index = i
  101. n_long += 1
  102. if df['time_diff'][i] > dt_short:
  103. self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
  104. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  105. self.logger.info("缺失点数:{}".format(points))
  106. if df['time_diff'][i] < dt_long:
  107. n_short += 1
  108. n_points += points
  109. self.logger.info("需要补值的点数:{}".format(points))
  110. dfs.append(df.iloc[start_index:, :-1])
  111. self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  112. self.logger.info("需要补值的总点数:{}".format(n_points))
  113. return dfs
  114. def data_fill(self, dfs, col_time, test=False):
  115. dfs_fill, inserts = [], 0
  116. for i, df in enumerate(dfs):
  117. df = rm_duplicated(df, self.logger)
  118. df1 = df.set_index(col_time, inplace=False)
  119. dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
  120. dff.reset_index(inplace=True)
  121. points = len(dff) - len(df1)
  122. dfs_fill.append(dff)
  123. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  124. inserts += points
  125. name = "预测数据" if test is True else "训练集"
  126. self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
  127. return dfs_fill
  128. def train_valid_split(self, datax, datay, valid_rate, shuffle):
  129. shuffle_index = np.random.permutation(len(datax))
  130. indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
  131. valid_size = int(len(datax) * valid_rate)
  132. valid_index = indexs[-valid_size:]
  133. train_index = indexs[:-valid_size]
  134. tx, vx, ty, vy = [], [], [], []
  135. for i, data in enumerate(zip(datax, datay)):
  136. if i in train_index:
  137. tx.append(data[0])
  138. ty.append(data[1])
  139. elif i in valid_index:
  140. vx.append(data[0])
  141. vy.append(data[1])
  142. return tx, vx, ty, vy
  143. def train_data_handler(self, data, opt):
  144. """
  145. 训练数据预处理:
  146. 清洗+补值+归一化
  147. Args:
  148. data: 从mongo中加载的数据
  149. opt:参数命名空间
  150. return:
  151. x_train
  152. x_valid
  153. y_train
  154. y_valid
  155. """
  156. col_time, features, target = opt.col_time, opt.features, opt.target
  157. # 清洗处理好的限电记录
  158. if 'is_limit' in data.columns:
  159. data = data[data['is_limit'] == False]
  160. # 筛选特征,数值化
  161. train_data = data[[col_time] + features + [target]]
  162. # 清洗特征平均缺失率大于20%的天
  163. # train_data = missing_features(train_data, features, col_time)
  164. train_data = train_data.sort_values(by=col_time)
  165. # train_data = train_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
  166. # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
  167. train_data_cleaned = key_field_row_cleaning(train_data, features + [target], self.logger)
  168. train_data_cleaned = train_data_cleaned.applymap(
  169. lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  170. # 创建特征和目标的标准化器
  171. train_scaler = MinMaxScaler(feature_range=(0, 1))
  172. target_scaler = MinMaxScaler(feature_range=(0, 1))
  173. # 标准化特征和目标
  174. scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features])
  175. scaled_target = target_scaler.fit_transform(train_data[[target]])
  176. train_data_cleaned[features] = scaled_train_data
  177. train_data_cleaned[[target]] = scaled_target
  178. train_datas = self.fill_train_data(train_data_cleaned, col_time)
  179. # 保存两个scaler
  180. scaled_train_bytes = BytesIO()
  181. scaled_target_bytes = BytesIO()
  182. joblib.dump(scaled_train_data, scaled_train_bytes)
  183. joblib.dump(scaled_target, scaled_target_bytes)
  184. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  185. scaled_target_bytes.seek(0)
  186. train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
  187. return train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes
  188. def pre_data_handler(self, data, feature_scaler, args):
  189. """
  190. 预测数据简单处理
  191. Args:
  192. data: 从mongo中加载的数据
  193. opt:参数命名空间
  194. return:
  195. scaled_features: 反归一化的特征
  196. """
  197. if 'is_limit' in data.columns:
  198. data = data[data['is_limit'] == False]
  199. features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
  200. args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
  201. pre_data = data.sort_values(by=col_time)
  202. scaled_features = feature_scaler.transform(pre_data[features])
  203. pre_x = self.get_predict_data([scaled_features], features)
  204. return pre_x