main.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225
  1. import pandas as pd
  2. import os
  3. from datetime import timedelta
  4. from inputData import DataBase
  5. import Arg
  6. #——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
  7. def mark_abnormal_streaks(df, columns, min_streak):
  8. abnormal_mask = pd.Series(False, index=df.index)
  9. streak_start = None
  10. for i in range(len(df)):
  11. if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
  12. streak_start = i
  13. if i - streak_start >= min_streak - 1:
  14. abnormal_mask[i - min_streak + 1:i + 1] = True
  15. return abnormal_mask
  16. def remove_abnormal_values(df, N):
  17. # 标记C_ACTIVE_POWER为-99的行为异常值
  18. abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
  19. count_abnormal1 = abnormal_mask1.sum()
  20. # 标记C_WS, A, B连续5行不变的行为异常值
  21. columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
  22. abnormal_mask2 = mark_abnormal_streaks(df, columns, N)
  23. count_abnormal2 = abnormal_mask2.sum()
  24. # 获得所有异常值的布尔掩码
  25. abnormal_mask = abnormal_mask1 | abnormal_mask2
  26. # 获取连续异常值具体数值
  27. removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
  28. # 剔除异常值
  29. df_clean = df[~abnormal_mask]
  30. total_removed = abnormal_mask.sum()
  31. return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
  32. def process_csv_files(input_dir, output_dir,M,N): # MBD:没有考虑时间重复
  33. if not os.path.exists(output_dir):
  34. os.makedirs(output_dir)
  35. for i in arg.turbineloc:
  36. input_file = os.path.join(input_dir, f"turbine-{i}.csv")
  37. output_file = os.path.join(output_dir, f"turbine-{i}.csv")
  38. # 读取csv文件
  39. df = pd.read_csv(input_file)
  40. # 剔除异常值,并获取异常值统计信息
  41. df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = remove_abnormal_values(df,N)
  42. # 输出异常值统计信息
  43. print(f"处理文件:{input_file}")
  44. print(f"剔除 -99 点异常值数量:{count_abnormal1}")
  45. print(f"剔除连续异常值数量:{count_abnormal2}")
  46. print(f"总共剔除数据量:{total_removed}")
  47. print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
  48. # 保存处理过的CSV文件
  49. df_clean.to_csv(output_file, index=False)
  50. #——————————————————————————风机单机时间对齐——————————————————————————————
  51. def TimeMerge(input_dir, output_dir,M):
  52. # 读取所有CSV文件
  53. files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in arg.turbineloc]
  54. dataframes = [pd.read_csv(f) for f in files]
  55. # 获取C_TIME列的交集
  56. c_time_intersection = set(dataframes[0]["C_TIME"])
  57. for df in dataframes[1:]:
  58. c_time_intersection.intersection_update(df["C_TIME"])
  59. # 只保留C_TIME交集中的数据
  60. filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
  61. # 将每个过滤后的DataFrame写入新的CSV文件
  62. os.makedirs(output_dir, exist_ok=True)
  63. for (filtered_df, i) in zip(filtered_dataframes, arg.turbineloc):
  64. if i == 144:
  65. filtered_df['C_ACTIVE_POWER'] /= 1000
  66. filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
  67. #——————————————————————————风机缺失点处理——————————————————————————————
  68. def MissingPointProcessing(input_dir,output_dir,M,N):
  69. # 存储数据的列表
  70. # 读取M个文件
  71. for k in arg.turbineloc:
  72. file_name = input_dir + '/' + f"turbine-{k}.csv"
  73. # file_name = os.path.join(input_dir, f"turbine-{k}.csv")
  74. # 读取CSV文件
  75. data = pd.read_csv(file_name, parse_dates=['C_TIME'])
  76. # 计算时间差
  77. data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
  78. # 找出缺失的时间点
  79. missing_data_points = data[data['time_diff'] > 900]
  80. # 存储填充的时间和值
  81. filled_data = []
  82. # 输出缺失的开始时刻和数量
  83. print("缺失的开始时刻:")
  84. for index, row in missing_data_points.iterrows():
  85. missing_start = row['C_TIME'] - timedelta(seconds=row['time_diff'])
  86. missing_count = int(row['time_diff'] // 900) - 1
  87. # 如果缺失的点数小于N个,则进行填充 MBD:填充代码比较啰嗦
  88. if missing_count <= N:
  89. prev_values = data.iloc[index - 1][['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
  90. next_values = row[['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
  91. for i in range(1, missing_count + 1):
  92. t = i / (missing_count + 1)
  93. filled_time = missing_start + timedelta(minutes=15 * i)
  94. filled_values = {
  95. 'C_TIME': filled_time,
  96. 'C_WS': prev_values['C_WS'] + (next_values['C_WS'] - prev_values['C_WS']) * t,
  97. 'C_WD': prev_values['C_WD']+(next_values['C_WD']-prev_values['C_WD'])*t,
  98. 'C_ACTIVE_POWER': prev_values['C_ACTIVE_POWER'] + (
  99. next_values['C_ACTIVE_POWER'] - prev_values['C_ACTIVE_POWER']) * t,
  100. }
  101. # 将角度值限制在-180到180的范围内
  102. filled_values['C_WD'] = (filled_values['C_WD'] + 180) % 360 - 180
  103. filled_data.append(filled_values)
  104. print(f"填充的时间: {filled_time}, 填充的值: {filled_values}")
  105. print(f"{missing_start} - 缺失的点的数量: {missing_count}")
  106. # 将填充的数据插入原始数据中
  107. filled_df = pd.DataFrame(filled_data)
  108. data = pd.concat([data, filled_df], ignore_index=True)
  109. # 对数据按时间排序并重置索引
  110. data = data.sort_values(by='C_TIME').reset_index(drop=True)
  111. # 输出总缺失点数
  112. missing_data_points = data[data['time_diff'] > 900]
  113. print(f"总缺失点数: {int(missing_data_points['time_diff'].sum() // 900) - len(missing_data_points)}")
  114. data.drop(columns=['time_diff'], inplace=True)
  115. os.makedirs(output_dir, exist_ok=True)
  116. output_path_name = os.path.join(output_dir, f"turbine-{k}.csv")
  117. print(output_path_name)
  118. # 保存插值后的文件
  119. data.to_csv(output_path_name, index=False)
  120. #——————————————————————————风机单机连续时间段分割——————————————————————————————
  121. def top_n_continuous_periods(data, n):
  122. continuous_periods = []
  123. continuous_start = data['C_TIME'].iloc[0]
  124. continuous_count = 1
  125. for i in range(1, len(data)):
  126. if data['time_diff'].iloc[i] == 900:
  127. continuous_count += 1
  128. else:
  129. continuous_periods.append({
  130. 'start': continuous_start,
  131. 'end': data['C_TIME'].iloc[i - 1],
  132. 'count': continuous_count
  133. })
  134. continuous_start = data['C_TIME'].iloc[i]
  135. continuous_count = 1
  136. continuous_periods.append({
  137. 'start': continuous_start,
  138. 'end': data['C_TIME'].iloc[-1],
  139. 'count': continuous_count
  140. })
  141. continuous_periods.sort(key=lambda x: x['count'], reverse=True)
  142. return continuous_periods[:n]
  143. def Continuous_Data(input_dir,output_dir,M,TopN):
  144. # 读取CSV文件
  145. for k in arg.turbineloc:
  146. path_dir = f"turbine-{k}.csv"
  147. input_path = os.path.join(input_dir, path_dir)
  148. data = pd.read_csv(input_path, parse_dates=['C_TIME'])
  149. data = data.sort_values(by='C_TIME').reset_index(drop=True)
  150. # 计算时间差
  151. data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
  152. # 获取Top N连续的数据段
  153. top_n = TopN
  154. top_n_periods = top_n_continuous_periods(data, top_n)
  155. data.drop(columns=['time_diff'], inplace=True)
  156. # 输出Top N连续的数据的数量、开始时间和最后的时间
  157. print(f"Top {top_n} 连续的数据段:")
  158. for i, period in enumerate(top_n_periods):
  159. print(f"{i + 1}. 开始时间: {period['start']} - 结束时间: {period['end']} - 数据量: {period['count']}")
  160. output_file = f"turbine-{k}_{period['count']}.csv"
  161. mask = (data['C_TIME'] >= period['start']) & (data['C_TIME'] <= period['end'])
  162. filtered_df = data.loc[mask]
  163. # 更新文件名中的period['count']为数据集大小
  164. output_file = output_file.replace(str(period['count']), str(filtered_df.shape[0]))
  165. output_folder = f"Continuous_Turbine_Data_{period['count']}_{period['start'].strftime('%y-%m-%d-%H-%M')}_{period['end'].strftime('%y-%m-%d-%H-%M')}"
  166. output_folder = os.path.join(output_dir, output_folder)
  167. if not os.path.exists(output_folder):
  168. os.makedirs(output_folder)
  169. # 保存截取的数据到新的csv文件
  170. # filtered_df.to_csv(output_dir, index=False)
  171. filtered_df.to_csv(os.path.join(output_folder, output_file), index=False)
  172. print(f"Processed {input_path}")
  173. if __name__ == "__main__":
  174. import datetime
  175. arg = Arg.Arg()
  176. db = DataBase(arg=arg)
  177. db.data_process()
  178. # input_dir = "../data/314/turbine-15" # 输入文件夹路径
  179. # output_dir = "../data/314/output_clean_csv_files" # 输出文件夹路径
  180. # 对机头风速连续异常值和-99进行清洗,第三个参数是连续5个值不变以后就认为异常
  181. # 这步会生成一个"output_clean_csv_files"文件夹,里面包含全部单机的数据,存储的机头风速只清理了-99,参数50是风机数量+1,风机参数5就是连续5个点的认为是异常值,全部剔除。
  182. # process_csv_files(input_dir, output_dir, 50, 5)
  183. # output_dir_time_Merge = "../data/314/output_filtered_csv_files"
  184. # 这步会生成一个"output_filtered_csv_files"文件夹,在上一步的基础上,对齐了全部风机的时间,只各自保留了交集。
  185. # TimeMerge(output_dir,output_dir_time_Merge,50)
  186. # output_complete_data = "../data_mts/complete_data"
  187. # 这步会生成一个"complete_data"文件夹,在上一步的基础上,填充了10个时间点之内的缺失。
  188. # MissingPointProcessing(output_dir_time_Merge,output_complete_data,50,10)
  189. # continuous_time = "../data_mts/continuous_data"
  190. # # 这步会生成一个"Continuous_data"文件夹,在上一步的基础上,取Top10个连续时间段最长的单机数据。
  191. # Continuous_Data(output_complete_data, continuous_time, 50, 10)