data_handler.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :data_handler.py
  4. # @Time :2025/1/8 14:56
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import argparse
  8. import pandas as pd
  9. from pyexpat import features
  10. from common.data_cleaning import *
  11. class DataHandler(object):
  12. def __init__(self, logger, args):
  13. self.logger = logger
  14. self.opt = argparse.Namespace(**args)
  15. def get_train_data(self, dfs, col_time, features, target):
  16. train_x, valid_x, train_y, valid_y = [], [], [], []
  17. for i, df in enumerate(dfs, start=1):
  18. if len(df) < self.opt.Model["time_step"]:
  19. self.logger.info("特征处理-训练数据-不满足time_step")
  20. datax, datay = self.get_timestep_features(df, col_time, features, target, is_train=True)
  21. if len(datax) < 10:
  22. self.logger.info("特征处理-训练数据-无法进行最小分割")
  23. continue
  24. tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
  25. train_x.extend(tx)
  26. valid_x.extend(vx)
  27. train_y.extend(ty)
  28. valid_y.extend(vy)
  29. train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0)
  30. valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0)
  31. train_x = np.array([x.values for x in train_x])
  32. valid_x = np.array([x.values for x in valid_x])
  33. # train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
  34. # valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
  35. return train_x, valid_x, train_y, valid_y
  36. def get_timestep_features(self, norm_data, col_time, features, target, is_train): # 这段代码基于pandas方法的优化
  37. time_step = self.opt.Model["time_step"]
  38. feature_data = norm_data.reset_index(drop=True)
  39. time_step_loc = time_step - 1
  40. train_num = int(len(feature_data))
  41. label_features = [col_time, target] if is_train is True else [col_time, target]
  42. nwp_cs = features
  43. nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)] # 数据库字段 'C_T': 'C_WS170'
  44. labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
  45. features_x, features_y = [], []
  46. for i, row in enumerate(zip(nwp, labels)):
  47. features_x.append(row[0])
  48. features_y.append(row[1])
  49. return features_x, features_y
  50. def fill_train_data(self, unite, col_time):
  51. unite[col_time] = pd.to_datetime(unite[col_time])
  52. unite['time_diff'] = unite[col_time].diff()
  53. dt_short = pd.Timedelta(minutes=15)
  54. dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
  55. data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
  56. miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
  57. miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
  58. self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
  59. if miss_number > 0 and self.opt.Model["train_data_fill"]:
  60. data_train = self.data_fill(data_train, col_time)
  61. return data_train
  62. def missing_time_splite(self, df, dt_short, dt_long, col_time):
  63. df.reset_index(drop=True, inplace=True)
  64. n_long, n_short, n_points = 0, 0, 0
  65. start_index = 0
  66. dfs = []
  67. for i in range(1, len(df)):
  68. if df['time_diff'][i] >= dt_long:
  69. df_long = df.iloc[start_index:i, :-1]
  70. dfs.append(df_long)
  71. start_index = i
  72. n_long += 1
  73. if df['time_diff'][i] > dt_short:
  74. self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
  75. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  76. self.logger.info("缺失点数:{}".format(points))
  77. if df['time_diff'][i] < dt_long:
  78. n_short += 1
  79. n_points += points
  80. self.logger.info("需要补值的点数:{}".format(points))
  81. dfs.append(df.iloc[start_index:, :-1])
  82. self.logger.info(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
  83. self.logger.info("需要补值的总点数:{}".format(n_points))
  84. return dfs
  85. def data_fill(self, dfs, col_time, test=False):
  86. dfs_fill, inserts = [], 0
  87. for i, df in enumerate(dfs):
  88. df = rm_duplicated(df, self.logger)
  89. df1 = df.set_index(col_time, inplace=False)
  90. dff = df1.resample('15T').interpolate(method='linear') # 采用线性补值,其他补值方法需要进一步对比
  91. dff.reset_index(inplace=True)
  92. points = len(dff) - len(df1)
  93. dfs_fill.append(dff)
  94. self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  95. inserts += points
  96. name = "预测数据" if test is True else "训练集"
  97. self.logger.info("{}分成了{}段,实际一共补值{}点".format(name, len(dfs_fill), inserts))
  98. return dfs_fill
  99. def train_valid_split(self, datax, datay, valid_rate, shuffle):
  100. shuffle_index = np.random.permutation(len(datax))
  101. indexs = shuffle_index.tolist() if shuffle else np.arange(0, len(datax)).tolist()
  102. valid_size = int(len(datax) * valid_rate)
  103. valid_index = indexs[-valid_size:]
  104. train_index = indexs[:-valid_size]
  105. tx, vx, ty, vy = [], [], [], []
  106. for i, data in enumerate(zip(datax, datay)):
  107. if i in train_index:
  108. tx.append(data[0])
  109. ty.append(data[1])
  110. elif i in valid_index:
  111. vx.append(data[0])
  112. vy.append(data[1])
  113. return tx, vx, ty, vy