data_handler.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_handler.py
  4. # @Time :2025/1/8 14:56
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import argparse, numbers, joblib
  8. import pandas as pd
  9. from io import BytesIO
  10. from bson.decimal128 import Decimal128
  11. from sklearn.preprocessing import MinMaxScaler
  12. from app.common.data_cleaning import *
  13. class DataHandler(object):
  14. def __init__(self, logger, args):
  15. self.logger = logger
  16. self.opt = argparse.Namespace(**args)
  17. def get_train_data(self, dfs, col_time, target):
  18. train_x, valid_x, train_y, valid_y = [], [], [], []
  19. for i, df in enumerate(dfs, start=1):
  20. if len(df) < self.opt.Model["time_step"]:
  21. self.logger.info("特征处理-训练数据-不满足time_step")
  22. datax, datay = self.get_timestep_features(df, col_time, target, is_train=True)
  23. if len(datax) < 10:
  24. self.logger.info("特征处理-训练数据-无法进行最小分割")
  25. continue
  26. tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  27. train_x.extend(tx)
  28. valid_x.extend(vx)
  29. train_y.extend(ty)
  30. valid_y.extend(vy)
  31. train_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in train_y]]), axis=0)
  32. valid_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in valid_y]]), axis=0)
  33. train_x = np.array([x.values for x in train_x])
  34. valid_x = np.array([x.values for x in valid_x])
  35. return train_x, valid_x, train_y, valid_y
  36. def get_predict_data(self, dfs, features):
  37. test_x = []
  38. for i, df in enumerate(dfs, start=1):
  39. if len(df) < self.opt.Model["time_step"]:
  40. self.logger.info("特征处理-预测数据-不满足time_step")
  41. continue
  42. datax = self.get_predict_features(df, features)
  43. test_x.extend(datax)
  44. test_x = np.array(test_x)
  45. return test_x
  46. def get_predict_features(self, norm_data, features):
  47. """
  48. 均分数据,获取预测数据集
  49. """
  50. time_step = self.opt.Model["time_step"]
  51. feature_data = norm_data.reset_index(drop=True)
  52. time_step_loc = time_step - 1
  53. iters = int(len(feature_data)) // self.opt.Model['time_step']
  54. end = int(len(feature_data)) % self.opt.Model['time_step']
  55. features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, features].reset_index(drop=True) for i in range(iters)])
  56. if end > 0:
  57. df = feature_data.tail(end)
  58. df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True)
  59. features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
  60. return features_x
  61. def get_timestep_features(self, norm_data, col_time, target, is_train):
  62. """
  63. 步长分割数据,获取时序训练集
  64. """
  65. time_step = self.opt.Model["time_step"]
  66. feature_data = norm_data.reset_index(drop=True)
  67. time_step_loc = time_step - 1
  68. train_num = int(len(feature_data))
  69. label_features = [col_time, target] if is_train is True else [col_time, target]
  70. nwp_cs = self.opt.features
  71. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
  72. labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
  73. features_x, features_y = [], []
  74. for i, row in enumerate(zip(nwp, labels)):
  75. features_x.append(row[0])
  76. features_y.append(row[1])
  77. return features_x, features_y
  78. def fill_train_data(self, unite, col_time):
  79. """
  80. 补值
  81. """
  82. unite[col_time] = pd.to_datetime(unite[col_time])
  83. unite['time_diff'] = unite[col_time].diff()
  84. dt_short = pd.Timedelta(minutes=15)
  85. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  86. data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
  87. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  88. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  89. self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
  90. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  91. data_train = self.data_fill(data_train, col_time)
  92. return data_train
  93. def fill_pre_data(self, unite):
  94. unite = unite.interpolate(method='linear') # nwp先进行线性填充
  95. unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充
  96. return unite
  97. def missing_time_splite(self, df, dt_short, dt_long, col_time):
  98. df.reset_index(drop=True, inplace=True)
  99. n_long, n_short, n_points = 0, 0, 0
  100. start_index = 0
  101. dfs = []
  102. for i in range(1, len(df)):
  103. if df['time_diff'][i] >= dt_long:
  104. df_long = df.iloc[start_index:i, :-1]
  105. dfs.append(df_long)
  106. start_index = i
  107. n_long += 1
  108. if df['time_diff'][i] > dt_short:
  109. self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
  110. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  111. self.logger.info("缺失点数:{}".format(points))
  112. if df['time_diff'][i] < dt_long:
  113. n_short += 1
  114. n_points += points
  115. self.logger.info("需要补值的点数:{}".format(points))
  116. dfs.append(df.iloc[start_index:, :-1])
  117. self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  118. self.logger.info("需要补值的总点数:{}".format(n_points))
  119. return dfs
  120. def data_fill(self, dfs, col_time, test=False):
  121. dfs_fill, inserts = [], 0
  122. for i, df in enumerate(dfs):
  123. df = rm_duplicated(df, self.logger, col_time=col_time)
  124. df1 = df.set_index(col_time, inplace=False)
  125. dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
  126. dff.reset_index(inplace=True)
  127. points = len(dff) - len(df1)
  128. dfs_fill.append(dff)
  129. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  130. inserts += points
  131. name = "预测数据" if test is True else "训练集"
  132. self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
  133. return dfs_fill
  134. def train_valid_split(self, datax, datay, valid_rate, shuffle):
  135. shuffle_index = np.random.permutation(len(datax))
  136. indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
  137. valid_size = int(len(datax) * valid_rate)
  138. valid_index = indexs[-valid_size:]
  139. train_index = indexs[:-valid_size]
  140. tx, vx, ty, vy = [], [], [], []
  141. for i, data in enumerate(zip(datax, datay)):
  142. if i in train_index:
  143. tx.append(data[0])
  144. ty.append(data[1])
  145. elif i in valid_index:
  146. vx.append(data[0])
  147. vy.append(data[1])
  148. return tx, vx, ty, vy
  149. def train_data_handler(self, data, bp_data=False):
  150. """
  151. 训练数据预处理:
  152. 清洗+补值+归一化
  153. Args:
  154. data: 从mongo中加载的数据
  155. opt:参数命名空间
  156. return:
  157. x_train
  158. x_valid
  159. y_train
  160. y_valid
  161. """
  162. col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
  163. features = self.opt.features_owner if self.opt.switch_nwp_owner else self.opt.features
  164. # 清洗限电记录
  165. if 'is_limit' in data.columns:
  166. data = data[data['is_limit'] == False]
  167. # 筛选特征,数值化,排序
  168. train_data = data[[col_time] + features + [target]]
  169. train_data = train_data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  170. train_data = train_data.sort_values(by=col_time)
  171. # 对清洗完限电的数据进行特征预处理:
  172. # 1.空值异常值清洗
  173. train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time=col_time)
  174. # 2. 标准化
  175. # 创建特征和目标的标准化器
  176. train_scaler = MinMaxScaler(feature_range=(0, 1))
  177. target_scaler = MinMaxScaler(feature_range=(0, 1))
  178. # 标准化特征和目标
  179. self.opt.features = list(set(train_data_cleaned.columns.tolist()) & set(features))
  180. scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features])
  181. scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
  182. scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0, 0]
  183. train_data_cleaned[self.opt.features] = scaled_train_data
  184. train_data_cleaned[[target]] = scaled_target
  185. # 3.缺值补值
  186. train_datas = self.fill_train_data(train_data_cleaned, col_time)
  187. # 保存两个scaler
  188. scaled_train_bytes = BytesIO()
  189. scaled_target_bytes = BytesIO()
  190. joblib.dump(train_scaler, scaled_train_bytes)
  191. joblib.dump(target_scaler, scaled_target_bytes)
  192. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  193. scaled_target_bytes.seek(0)
  194. if bp_data:
  195. train_data = pd.concat(train_datas, axis=0)
  196. train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  197. train_x, valid_x, train_y, valid_y = np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
  198. else:
  199. train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target)
  200. return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
  201. def pre_data_handler(self, data, feature_scaler, bp_data=False):
  202. """
  203. 预测数据简单处理
  204. Args:
  205. data: 从mongo中加载的数据
  206. opt:参数命名空间
  207. return:
  208. scaled_features: 反归一化的特征
  209. """
  210. # 清洗限电记录
  211. if 'is_limit' in data.columns:
  212. data = data[data['is_limit'] == False]
  213. col_time = self.opt.col_time
  214. features = self.opt.features_owner if self.opt.switch_nwp_owner else self.opt.features
  215. data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  216. data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
  217. if not set(features).issubset(set(data.columns.tolist())):
  218. raise ValueError("预测数据特征不满足模型特征!")
  219. pre_data = data[features].copy()
  220. if self.opt.Model['predict_data_fill']:
  221. pre_data = self.fill_pre_data(pre_data)
  222. pre_data.loc[:, features] = feature_scaler.transform(pre_data)
  223. if bp_data:
  224. pre_x = np.array(pre_data)
  225. else:
  226. pre_x = self.get_predict_data([pre_data], features)
  227. return pre_x, data
  228. def write_number_to_file(file_path, line_number, number, mode='overwrite'):
  229. """
  230. 向指定文件写入数字(支持清空重写和覆盖两种模式)
  231. 参数:
  232. file_path (str): 文件路径
  233. line_number (int): 行号(从1开始)
  234. number (int/float): 要写入的数字
  235. mode (str): 写入模式 - 'overwrite'覆盖指定行 或 'rewrite'清空重写
  236. 返回:
  237. bool: 操作是否成功
  238. 示例:
  239. write_number_to_file("data.txt", 3, 42.5) # 覆盖模式
  240. write_number_to_file("data.txt", 2, 99, 'rewrite') # 清空重写模式
  241. """
  242. try:
  243. # 参数验证
  244. if line_number < 1:
  245. raise ValueError("行号必须大于等于1")
  246. if mode not in ['overwrite', 'rewrite']:
  247. raise ValueError("模式参数必须为 'overwrite' 或 'rewrite'")
  248. # 转换数字为带换行的字符串
  249. number_str = f"{number}\n"
  250. target_index = line_number - 1
  251. # 处理不同模式
  252. if mode == 'rewrite':
  253. # 清空后创建新内容
  254. lines = ['\n'] * target_index # 填充前置空行
  255. lines.append(number_str)
  256. else:
  257. # 覆盖模式处理
  258. try:
  259. with open(file_path, 'r', encoding='utf-8') as f:
  260. lines = f.readlines()
  261. except FileNotFoundError:
  262. lines = []
  263. # 扩展行数到目标位置
  264. while len(lines) <= target_index:
  265. lines.append('\n')
  266. # 替换目标行内容
  267. lines[target_index] = number_str
  268. # 写入文件
  269. with open(file_path, 'w', encoding='utf-8') as f:
  270. f.writelines(lines)
  271. return True
  272. except Exception as e:
  273. print(f"操作失败: {str(e)}")
  274. return False