turbine_cleaning.py 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2024/6/21 13:49
  4. # file: turbine_cleaning.py
  5. # author: David
  6. # company: shenyang JY
  7. import os
  8. import pandas as pd
  9. from datetime import timedelta
  10. # ——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
  11. def mark_abnormal_streaks(df, columns, min_streak):
  12. abnormal_mask = pd.Series(False, index=df.index)
  13. streak_start = None
  14. for i in range(len(df)):
  15. if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
  16. streak_start = i
  17. if i - streak_start >= min_streak - 1:
  18. abnormal_mask[i - min_streak + 1:i + 1] = True
  19. return abnormal_mask
  20. def remove_abnormal_values(df, N):
  21. # 标记C_ACTIVE_POWER为-99的行为异常值
  22. abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
  23. count_abnormal1 = abnormal_mask1.sum()
  24. # 标记C_WS, A, B连续5行不变的行为异常值
  25. columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
  26. abnormal_mask2 = mark_abnormal_streaks(df, columns, N)
  27. count_abnormal2 = abnormal_mask2.sum()
  28. # 获得所有异常值的布尔掩码
  29. abnormal_mask = abnormal_mask1 | abnormal_mask2
  30. # 获取连续异常值具体数值
  31. removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
  32. # 剔除异常值
  33. df_clean = df[~abnormal_mask]
  34. total_removed = abnormal_mask.sum()
  35. return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
  36. def process_csv_files(input_dir, output_dir, turbines_id, M,N): # MBD:没有考虑时间重复
  37. if not os.path.exists(output_dir):
  38. os.makedirs(output_dir)
  39. for i in turbines_id:
  40. input_file = os.path.join(input_dir, f"turbine-{i}.csv")
  41. output_file = os.path.join(output_dir, f"turbine-{i}.csv")
  42. # 读取csv文件
  43. df = pd.read_csv(input_file)
  44. # 剔除异常值,并获取异常值统计信息
  45. df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = remove_abnormal_values(df,N)
  46. # 输出异常值统计信息
  47. print(f"处理文件:{input_file}")
  48. print(f"剔除 -99 点异常值数量:{count_abnormal1}")
  49. print(f"剔除连续异常值数量:{count_abnormal2}")
  50. print(f"总共剔除数据量:{total_removed}")
  51. print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
  52. # 保存处理过的CSV文件
  53. df_clean.to_csv(output_file, index=False)
  54. # ——————————————————————————风机单机时间对齐——————————————————————————————
  55. def TimeMerge(input_dir, output_dir, turbines_id, M):
  56. # 读取所有CSV文件
  57. files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in turbines_id]
  58. dataframes = [pd.read_csv(f) for f in files]
  59. # 获取C_TIME列的交集
  60. c_time_intersection = set(dataframes[0]["C_TIME"])
  61. for df in dataframes[1:]:
  62. c_time_intersection.intersection_update(df["C_TIME"])
  63. # 只保留C_TIME交集中的数据
  64. filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
  65. # 将每个过滤后的DataFrame写入新的CSV文件
  66. os.makedirs(output_dir, exist_ok=True)
  67. turbines_all, names = [], ['C_TIME']
  68. for (filtered_df, i) in zip(filtered_dataframes, turbines_id):
  69. # if i == 144:
  70. # filtered_df['C_ACTIVE_POWER'] /= 1000
  71. filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
  72. names.append('C_ACTIVE_POWER_{}'.format(i))
  73. turbines_all.append(filtered_df['C_ACTIVE_POWER'].reset_index(drop=True))
  74. turbines_all.insert(0, filtered_dataframes[0]['C_TIME'].reset_index(drop=True))
  75. turbines_all = pd.concat(turbines_all, axis=1)
  76. turbines_all.columns = names
  77. turbines_all.to_csv(os.path.join(output_dir, f"turbines.csv"), index=False)
  78. # ——————————————————————————风机缺失点处理——————————————————————————————
  79. def MissingPointProcessing(input_dir,output_dir, turbines_id, M,N):
  80. # 存储数据的列表
  81. # 读取M个文件
  82. for k in turbines_id:
  83. file_name = input_dir + '/' + f"turbine-{k}.csv"
  84. # file_name = os.path.join(input_dir, f"turbine-{k}.csv")
  85. # 读取CSV文件
  86. data = pd.read_csv(file_name, parse_dates=['C_TIME'])
  87. # 计算时间差
  88. data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
  89. # 找出缺失的时间点
  90. missing_data_points = data[data['time_diff'] > 900]
  91. # 存储填充的时间和值
  92. filled_data = []
  93. # 输出缺失的开始时刻和数量
  94. print("缺失的开始时刻:")
  95. for index, row in missing_data_points.iterrows():
  96. missing_start = row['C_TIME'] - timedelta(seconds=row['time_diff'])
  97. missing_count = int(row['time_diff'] // 900) - 1
  98. # 如果缺失的点数小于N个,则进行填充 MBD:填充代码比较啰嗦
  99. if missing_count <= N:
  100. prev_values = data.iloc[index - 1][['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
  101. next_values = row[['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
  102. for i in range(1, missing_count + 1):
  103. t = i / (missing_count + 1)
  104. filled_time = missing_start + timedelta(minutes=15 * i)
  105. filled_values = {
  106. 'C_TIME': filled_time,
  107. 'C_WS': prev_values['C_WS'] + (next_values['C_WS'] - prev_values['C_WS']) * t,
  108. 'C_WD': prev_values['C_WD']+(next_values['C_WD']-prev_values['C_WD'])*t,
  109. 'C_ACTIVE_POWER': prev_values['C_ACTIVE_POWER'] + (
  110. next_values['C_ACTIVE_POWER'] - prev_values['C_ACTIVE_POWER']) * t,
  111. }
  112. # 将角度值限制在-180到180的范围内
  113. filled_values['C_WD'] = (filled_values['C_WD'] + 180) % 360 - 180
  114. filled_data.append(filled_values)
  115. print(f"填充的时间: {filled_time}, 填充的值: {filled_values}")
  116. print(f"{missing_start} - 缺失的点的数量: {missing_count}")
  117. # 将填充的数据插入原始数据中
  118. filled_df = pd.DataFrame(filled_data)
  119. data = pd.concat([data, filled_df], ignore_index=True)
  120. # 对数据按时间排序并重置索引
  121. data = data.sort_values(by='C_TIME').reset_index(drop=True)
  122. # 输出总缺失点数
  123. missing_data_points = data[data['time_diff'] > 900]
  124. print(f"总缺失点数: {int(missing_data_points['time_diff'].sum() // 900) - len(missing_data_points)}")
  125. data.drop(columns=['time_diff'], inplace=True)
  126. os.makedirs(output_dir, exist_ok=True)
  127. output_path_name = os.path.join(output_dir, f"turbine-{k}.csv")
  128. print(output_path_name)
  129. # 保存插值后的文件
  130. data.to_csv(output_path_name, index=False)