data_handler.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_handler.py
  4. # @Time :2025/1/8 14:56
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import argparse, numbers, joblib
  8. import numpy as np
  9. import pandas as pd
  10. from io import BytesIO
  11. from bson.decimal128 import Decimal128
  12. from sklearn.preprocessing import MinMaxScaler
  13. from app.common.data_cleaning import *
  14. class DataHandler(object):
  15. def __init__(self, logger, args):
  16. self.logger = logger
  17. self.opt = args.parse_args_and_yaml()
  18. def get_train_data(self, dfs, col_time, target):
  19. train_x, valid_x, train_y, valid_y = [], [], [], []
  20. for i, df in enumerate(dfs, start=1):
  21. if len(df) < self.opt.Model["time_step"]:
  22. self.logger.info("特征处理-训练数据-不满足time_step")
  23. datax, datay = self.get_timestep_features(df, col_time, target, is_train=True)
  24. if len(datax) < 10:
  25. self.logger.info("特征处理-训练数据-无法进行最小分割")
  26. continue
  27. tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  28. train_x.extend(tx)
  29. valid_x.extend(vx)
  30. train_y.extend(ty)
  31. valid_y.extend(vy)
  32. train_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in train_y]]), axis=0)
  33. valid_y = np.concatenate(np.array([[y.iloc[:, 1].values for y in valid_y]]), axis=0)
  34. train_x = np.array([x.values for x in train_x])
  35. valid_x = np.array([x.values for x in valid_x])
  36. return train_x, valid_x, train_y, valid_y
  37. def get_predict_data(self, dfs):
  38. test_x = []
  39. for i, df in enumerate(dfs, start=1):
  40. if len(df) < self.opt.Model["time_step"]:
  41. self.logger.info("特征处理-预测数据-不满足time_step")
  42. continue
  43. datax = self.get_predict_features(df)
  44. test_x.append(datax)
  45. test_x = np.concatenate(test_x, axis=0)
  46. return test_x
  47. def get_predict_features(self, norm_data):
  48. """
  49. 均分数据,获取预测数据集
  50. """
  51. time_step = self.opt.Model["time_step"]
  52. feature_data = norm_data.reset_index(drop=True)
  53. time_step_loc = time_step - 1
  54. iters = int(len(feature_data)) // self.opt.Model['time_step']
  55. end = int(len(feature_data)) % self.opt.Model['time_step']
  56. features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)])
  57. if end > 0:
  58. df = feature_data.tail(end)
  59. df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True)
  60. features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
  61. return features_x
  62. def get_timestep_features(self, norm_data, col_time, target, is_train):
  63. """
  64. 步长分割数据,获取时序训练集
  65. """
  66. time_step = self.opt.Model["time_step"]
  67. feature_data = norm_data.reset_index(drop=True)
  68. time_step_loc = time_step - 1
  69. train_num = int(len(feature_data))
  70. label_features = [col_time, target] if is_train is True else [col_time, target]
  71. nwp_cs = self.opt.features
  72. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
  73. labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
  74. features_x, features_y = [], []
  75. for i, row in enumerate(zip(nwp, labels)):
  76. features_x.append(row[0])
  77. features_y.append(row[1])
  78. return features_x, features_y
  79. def fill_train_data(self, unite, col_time):
  80. """
  81. 补值
  82. """
  83. unite[col_time] = pd.to_datetime(unite[col_time])
  84. unite['time_diff'] = unite[col_time].diff()
  85. dt_short = pd.Timedelta(minutes=15)
  86. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  87. data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
  88. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  89. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  90. self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
  91. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  92. data_train = self.data_fill(data_train, col_time)
  93. return data_train
  94. def fill_pre_data(self, unite):
  95. unite = unite.interpolate(method='linear') # nwp先进行线性填充
  96. unite = unite.ffill().bfill() # 再对超过采样边缘无法填充的点进行二次填充
  97. return unite
  98. def missing_time_splite(self, df, dt_short, dt_long, col_time):
  99. df.reset_index(drop=True, inplace=True)
  100. n_long, n_short, n_points = 0, 0, 0
  101. start_index = 0
  102. dfs = []
  103. for i in range(1, len(df)):
  104. if df['time_diff'][i] >= dt_long:
  105. df_long = df.iloc[start_index:i, :-1]
  106. dfs.append(df_long)
  107. start_index = i
  108. n_long += 1
  109. if df['time_diff'][i] > dt_short:
  110. self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
  111. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  112. self.logger.info("缺失点数:{}".format(points))
  113. if df['time_diff'][i] < dt_long:
  114. n_short += 1
  115. n_points += points
  116. self.logger.info("需要补值的点数:{}".format(points))
  117. dfs.append(df.iloc[start_index:, :-1])
  118. self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  119. self.logger.info("需要补值的总点数:{}".format(n_points))
  120. return dfs
  121. def data_fill(self, dfs, col_time, test=False):
  122. dfs_fill, inserts = [], 0
  123. for i, df in enumerate(dfs):
  124. df = rm_duplicated(df, self.logger)
  125. df1 = df.set_index(col_time, inplace=False)
  126. dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
  127. dff.reset_index(inplace=True)
  128. points = len(dff) - len(df1)
  129. dfs_fill.append(dff)
  130. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  131. inserts += points
  132. name = "预测数据" if test is True else "训练集"
  133. self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
  134. return dfs_fill
  135. def train_valid_split(self, datax, datay, valid_rate, shuffle):
  136. shuffle_index = np.random.permutation(len(datax))
  137. indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
  138. valid_size = int(len(datax) * valid_rate)
  139. valid_index = indexs[-valid_size:]
  140. train_index = indexs[:-valid_size]
  141. tx, vx, ty, vy = [], [], [], []
  142. for i, data in enumerate(zip(datax, datay)):
  143. if i in train_index:
  144. tx.append(data[0])
  145. ty.append(data[1])
  146. elif i in valid_index:
  147. vx.append(data[0])
  148. vy.append(data[1])
  149. return tx, vx, ty, vy
  150. def train_data_handler(self, data, bp_data=False):
  151. """
  152. 训练数据预处理:
  153. 清洗+补值+归一化
  154. Args:
  155. data: 从mongo中加载的数据
  156. opt:参数命名空间
  157. return:
  158. x_train
  159. x_valid
  160. y_train
  161. y_valid
  162. """
  163. col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
  164. # 清洗限电记录
  165. if 'is_limit' in data.columns:
  166. data = data[data['is_limit'] == False]
  167. # 筛选特征,数值化,排序
  168. train_data = data[[col_time] + features + [target]]
  169. train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  170. train_data = train_data.sort_values(by=col_time)
  171. # 清洗特征平均缺失率大于20%的天
  172. # train_data = missing_features(train_data, features, col_time)
  173. # 对清洗完限电的数据进行特征预处理:
  174. # 1.空值异常值清洗
  175. train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time=col_time)
  176. self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
  177. # 2. 标准化
  178. # 创建特征和目标的标准化器
  179. train_scaler = MinMaxScaler(feature_range=(0, 1))
  180. target_scaler = MinMaxScaler(feature_range=(0, 1))
  181. # 标准化特征和目标
  182. scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features])
  183. scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
  184. scaled_cap = target_scaler.transform(np.array([[float(self.opt.cap)]]))[0,0]
  185. train_data_cleaned[self.opt.features] = scaled_train_data
  186. train_data_cleaned[[target]] = scaled_target
  187. # 3.缺值补值
  188. train_datas = self.fill_train_data(train_data_cleaned, col_time)
  189. # 保存两个scaler
  190. scaled_train_bytes = BytesIO()
  191. scaled_target_bytes = BytesIO()
  192. joblib.dump(train_scaler, scaled_train_bytes)
  193. joblib.dump(target_scaler, scaled_target_bytes)
  194. scaled_train_bytes.seek(0) # Reset pointer to the beginning of the byte stream
  195. scaled_target_bytes.seek(0)
  196. if bp_data:
  197. train_data = pd.concat(train_datas, axis=0)
  198. train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[self.opt.features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  199. train_x, valid_x, train_y, valid_y = np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
  200. else:
  201. train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target)
  202. return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
  203. def pre_data_handler(self, data, feature_scaler, bp_data=False):
  204. """
  205. 预测数据简单处理
  206. Args:
  207. data: 从mongo中加载的数据
  208. opt:参数命名空间
  209. return:
  210. scaled_features: 反归一化的特征
  211. """
  212. # 清洗限电记录
  213. if 'is_limit' in data.columns:
  214. data = data[data['is_limit'] == False]
  215. # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
  216. # args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
  217. col_time, features = self.opt.col_time, self.opt.features
  218. data = data.map(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
  219. data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
  220. if not set(features).issubset(set(data.columns.tolist())):
  221. raise ValueError("预测数据特征不满足模型特征!")
  222. pre_data = data[features].copy()
  223. if self.opt.Model['predict_data_fill']:
  224. pre_data = self.fill_pre_data(pre_data)
  225. pre_data.loc[:, features] = feature_scaler.transform(pre_data)
  226. if bp_data:
  227. pre_x = np.array(pre_data)
  228. else:
  229. pre_x = self.get_predict_data([pre_data])
  230. return pre_x, data
  231. def write_number_to_file(file_path, line_number, number, mode='overwrite'):
  232. """
  233. 向指定文件写入数字(支持清空重写和覆盖两种模式)
  234. 参数:
  235. file_path (str): 文件路径
  236. line_number (int): 行号(从1开始)
  237. number (int/float): 要写入的数字
  238. mode (str): 写入模式 - 'overwrite'覆盖指定行 或 'rewrite'清空重写
  239. 返回:
  240. bool: 操作是否成功
  241. 示例:
  242. write_number_to_file("data.txt", 3, 42.5) # 覆盖模式
  243. write_number_to_file("data.txt", 2, 99, 'rewrite') # 清空重写模式
  244. """
  245. try:
  246. # 参数验证
  247. if line_number < 1:
  248. raise ValueError("行号必须大于等于1")
  249. if mode not in ['overwrite', 'rewrite']:
  250. raise ValueError("模式参数必须为 'overwrite' 或 'rewrite'")
  251. # 转换数字为带换行的字符串
  252. number_str = f"{number}\n"
  253. target_index = line_number - 1
  254. # 处理不同模式
  255. if mode == 'rewrite':
  256. # 清空后创建新内容
  257. lines = ['\n'] * target_index # 填充前置空行
  258. lines.append(number_str)
  259. else:
  260. # 覆盖模式处理
  261. try:
  262. with open(file_path, 'r', encoding='utf-8') as f:
  263. lines = f.readlines()
  264. except FileNotFoundError:
  265. lines = []
  266. # 扩展行数到目标位置
  267. while len(lines) <= target_index:
  268. lines.append('\n')
  269. # 替换目标行内容
  270. lines[target_index] = number_str
  271. # 写入文件
  272. with open(file_path, 'w', encoding='utf-8') as f:
  273. f.writelines(lines)
  274. return True
  275. except Exception as e:
  276. print(f"操作失败: {str(e)}")
  277. return False