data_process.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/17 10:10
  4. # file: main.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import os
  9. import numpy as np
  10. from data_utils import *
  11. import yaml
  12. class data_process(object):
  13. def __init__(self, opt):
  14. self.std = {}
  15. self.mean = {}
  16. self.opt = opt
  17. # 主要是联立后的补值操作
  18. def get_train_data(self):
  19. """
  20. :param args:
  21. :return: 返回分割补值处理好的npy向量文件
  22. """
  23. csv_data_path = self.opt.excel_data_path
  24. data_train = pd.read_csv(os.path.join(csv_data_path, self.opt.data_format["nwp"]))
  25. data_train['C_TIME'] = pd.to_datetime(data_train['C_TIME'])
  26. data_train = self.data_cleaning(data_train)
  27. # power['C_TIME'] = pd.to_datetime(power['C_TIME'])
  28. envir_cols = ['C_TIME', 'C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_AIRT', 'C_CELLT']
  29. # envir_cols = ['C_TIME', 'C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_OBLIQUER']
  30. # envir = envir.drop(labels='C_SYNC_TIME', axis=1)
  31. # 第一步:联立 MBD 环境数据不应该联立
  32. # unite = pd.merge(unite, envir, on='C_TIME')
  33. # 第二步:计算间隔
  34. # data_train['C_TIME'] = pd.to_datetime(data_train['C_TIME'])
  35. # data_train['time_diff'] = data_train['C_TIME'].diff()
  36. # dt_short = pd.Timedelta(minutes=15)
  37. # dt_long = pd.Timedelta(minutes=15 * 10)
  38. # dfs = self.missing_time_splite(data_train, dt_short, dt_long)
  39. # miss_points = data_train[(data_train['time_diff'] > dt_short) & (data_train['time_diff'] < dt_long)]
  40. # miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
  41. # print("再次测算,需要插值的总点数为:", miss_number)
  42. # dfs_train, dfs_test = self.data_fill(dfs)
  43. # from sklearn.model_selection import train_test_split
  44. # data_train, data_test = train_test_split(data_train, test_size=(1-self.opt.train_data_rate),
  45. # random_state=self.opt.Model["random_seed"],
  46. # shuffle=self.opt.shuffle_train_data)
  47. # envir = envir.iloc[:, :-1]
  48. data_train = self.norm_features(data_train, ['C_TIME'])
  49. self.feature_columns(data_train)
  50. return data_train
  51. def get_test_data(self):
  52. data_test = pd.read_csv(os.path.join(self.opt.excel_data_path, self.opt.data_format["test"]))
  53. data_test['C_TIME'] = pd.to_datetime(data_test['C_TIME'])
  54. data_test = self.data_cleaning(data_test) # envir_cols = ['C_TIME', 'C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_OBLIQUER']
  55. # envir = envir.iloc[:, :-1]
  56. # drop_cols = ['C_TEMPERATURE120', 'C_TEMPERATURE130', 'C_TEMPERATURE140',
  57. # 'C_TEMPERATURE150', 'C_TEMPERATURE160', 'C_TEMPERATURE170',
  58. # 'C_TEMPERATURE180', 'C_TEMPERATURE190', 'C_TEMPERATURE200',
  59. # 'C_DIRECTION120', 'C_DIRECTION130', 'C_DIRECTION140',
  60. # 'C_DIRECTION150', 'C_DIRECTION160', 'C_DIRECTION170',
  61. # 'C_DIRECTION180', 'C_DIRECTION190', 'C_DIRECTION200',
  62. # 'C_SPEED120', 'C_SPEED130', 'C_SPEED140', 'C_SPEED150',
  63. # 'C_SPEED160', 'C_SPEED170', 'C_SPEED180',
  64. # 'C_SPEED190', 'C_SPEED200'
  65. #
  66. # ]
  67. preserve = ['C_TIME', 'C_RADIATION', 'C_SURFACE_PRESSURE', 'C_HUMIDITY2', 'C_TEMPERATURE2', 'C_TEMPERATURE10', 'C_TEMPERATURE30', 'C_TEMPERATURE50', 'C_TEMPERATURE70', 'C_TEMPERATURE80', 'C_TEMPERATURE90', 'C_TEMPERATURE110', 'C_DIRECTION10', 'C_DIRECTION30', 'C_DIRECTION50', 'C_DIRECTION70', 'C_DIRECTION80', 'C_DIRECTION90', 'C_DIRECTION110', 'C_SPEED10', 'C_SPEED30', 'C_SPEED50', 'C_SPEED70', 'C_SPEED80', 'C_SPEED90', 'C_SPEED110', 'C_DNI_CALCD', 'C_SOLAR_ZENITH', 'C_CLEARSKY_GHI', 'C_LCC', 'C_MCC', 'C_HCC', 'C_TCC', 'C_TPR', 'col1_power', 'col2_power', 'sum_power', 'C_VALUE']
  68. data_test = data_test.loc[:, preserve]
  69. data_test = self.norm_features(data_test, ['C_TIME', 'sum_power', 'C_VALUE'])
  70. self.feature_columns(data_test)
  71. return data_test
  72. def feature_columns(self, data):
  73. self.opt.columns_lstm = list(data.loc[:, 'C_RADIATION': 'C_TPR'])
  74. self.opt.input_size_lstm = len(self.opt.columns_lstm)
  75. self.opt.input_size_cnn = 1
  76. print("cnn:", self.opt.columns_cnn)
  77. print("lstm", self.opt.columns_lstm)
  78. print("opt列名设置完毕!", self.opt.columns_cnn, self.opt.columns_lstm)
  79. def norm_features(self, data, drop_list):
  80. data1_ = data.drop(drop_list, axis=1, errors='ignore')
  81. columns = list(data1_.columns)
  82. mean = np.array([self.opt.mean[col] for col in columns])
  83. std = np.array([self.opt.std[col] for col in columns])
  84. new = []
  85. for i, d in data1_.iterrows():
  86. d = (d - mean) / std # 归一化
  87. new.append(d)
  88. new = pd.concat(new, axis=1).T
  89. for col in new.columns: # 繁琐不一定是 简洁
  90. data[col] = new[col]
  91. return data
  92. def data_cleaning(self, data, clean_value=[-9999.0, -99]):
  93. for val in clean_value:
  94. data = data.replace(val, np.nan)
  95. # nan 超过30% 删除
  96. data = data.dropna(axis=1, thresh=len(data)*0.8)
  97. # 删除取值全部相同的列
  98. data = data.loc[:, (data != data.iloc[0]).any()]
  99. # 剩下的nan进行线性插值
  100. data = data.interpolate(method='bfill')
  101. return data
  102. def missing_time_splite(self, df, dt_short, dt_long):
  103. n_long, n_short, n_points = 0, 0, 0
  104. start_index = 0
  105. dfs = []
  106. for i in range(1, len(df)):
  107. if df['time_diff'][i] >= dt_long:
  108. df_long = df.iloc[start_index:i, :-1]
  109. dfs.append(df_long)
  110. start_index = i
  111. n_long += 1
  112. if df['time_diff'][i] > dt_short:
  113. print(df['C_TIME'][i-1], end=" ~ ")
  114. print(df['C_TIME'][i], end=" ")
  115. points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
  116. print("缺失点数:", points)
  117. if df['time_diff'][i] < dt_long:
  118. n_short += 1
  119. n_points += points
  120. print("需要补值的点数:", points)
  121. dfs.append(df.iloc[start_index:, :-1])
  122. print("数据总数:", len(df), ",时序缺失的间隔:", n_short, "其中,较长的时间间隔:", n_long)
  123. print("需要补值的总点数:", n_points)
  124. return dfs
  125. def data_fill(self, dfs, test):
  126. dfs_train, dfs_test, inserts = [], [], 0
  127. for i, df in enumerate(dfs):
  128. df1 = df.set_index('C_TIME', inplace=False)
  129. dff = df1.resample('15T').bfill()
  130. dff.reset_index(inplace=True)
  131. points = len(dff) - len(df1)
  132. if i not in test:
  133. if i == 0:
  134. dff = dff.iloc[8:, :].reset_index(drop=True)
  135. dfs_train.append(dff)
  136. print("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  137. inserts += points
  138. else:
  139. print("{} ~ {} 有 {} 个点, 缺失 {} 个点.(测试集)".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
  140. dfs_test.append(dfs[i].reset_index(drop=True))
  141. print("训练集分成了{}段".format(len(dfs_train)))
  142. return dfs_train, dfs_test
  143. def drop_duplicated(self, df):
  144. df = df.groupby(level=0).mean() # DatetimeIndex时间索引去重
  145. return df
  146. def read_data(self, path, cols=None, index_col=None):
  147. init_data = pd.read_excel(path, usecols=cols, index_col=index_col, engine='openpyxl')
  148. return init_data
  149. if __name__ == "__main__":
  150. from config import myargparse
  151. parse = myargparse(discription="training config", add_help=False)
  152. opt = parse.parse_args_and_yaml()
  153. ds = data_process(opt=opt)
  154. path = ds.opt.excel_data_path
  155. os.makedirs(path, exist_ok=True)
  156. excel_data_path = ds.opt.excel_data_path
  157. data_format = ds.opt.data_format
  158. # dq_path = excel_data_path + data_format["dq"].replace('.csv', '.xlsx')
  159. rp_path = excel_data_path + data_format["rp"].replace('.csv', '.xlsx')
  160. nwp_path = excel_data_path + data_format["nwp"].replace('.csv', '.xlsx')
  161. envir_path = excel_data_path + data_format["envir"].replace('.csv', '.xlsx')
  162. rp_columns = ['C_TIME', 'C_REAL_VALUE'] # 待优化 ["'C_TIME'", "'C_REAL_VALUE'"] 原因:csv 字符串是单引号'',read_csv带单引号
  163. nwp = ds.read_data(nwp_path) # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
  164. nwp = ds.data_cleaning(nwp)
  165. nwp.drop(['C_SYNC_TIME'], axis=1, inplace=True)
  166. # nwp.set_index('C_TIME', inplace=True)
  167. # nwp = ds.drop_duplicated(nwp)
  168. envir = ds.read_data(envir_path) # 待优化 导出csv按照表的列顺序 read_csv按照csv列顺序读取
  169. envir = ds.data_cleaning(envir)
  170. # envir.set_index('C_TIME', inplace=True)
  171. # envir.drop(['C_WS_NO', 'C_SYNC_TIME'])
  172. # envir = ds.drop_duplicated(envir)
  173. rp = ds.read_data(rp_path, rp_columns)
  174. # rp.set_index('C_TIME', inplace=True) # nan也可以设置索引列
  175. rp = ds.data_cleaning(rp)
  176. # rp = ds.drop_duplicated(rp)
  177. dataframes = [nwp, rp, envir]
  178. # 查找最大起始时间和最小结束时间
  179. max_start_time = max(df['C_TIME'].min() for df in dataframes)
  180. min_end_time = min(df['C_TIME'].max() for df in dataframes)
  181. print(max_start_time)
  182. print(min_end_time)
  183. # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
  184. for i, df in enumerate(dataframes):
  185. df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 确保时间列是 datetime 类型
  186. df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
  187. # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
  188. dataframes[i] = df_filtered.reset_index(drop=True)
  189. dataframes[0].to_csv(os.path.join(path, 'process', 'nwp.csv'), index=False)
  190. dataframes[2].to_csv(os.path.join(path, 'process', 'envir.csv'), index=False)
  191. dataframes[1].to_csv(os.path.join(path, 'process', 'rp.csv'), index=False)