import pandas as pd import os from datetime import timedelta from inputData import DataBase import Arg #——————————————————————————机头风速-99和连续异常值清洗代码—————————————————————————————— def mark_abnormal_streaks(df, columns, min_streak): abnormal_mask = pd.Series(False, index=df.index) streak_start = None for i in range(len(df)): if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns): streak_start = i if i - streak_start >= min_streak - 1: abnormal_mask[i - min_streak + 1:i + 1] = True return abnormal_mask def remove_abnormal_values(df, N): # 标记C_ACTIVE_POWER为-99的行为异常值 abnormal_mask1 = df['C_ACTIVE_POWER'] == -99 count_abnormal1 = abnormal_mask1.sum() # 标记C_WS, A, B连续5行不变的行为异常值 columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER'] abnormal_mask2 = mark_abnormal_streaks(df, columns, N) count_abnormal2 = abnormal_mask2.sum() # 获得所有异常值的布尔掩码 abnormal_mask = abnormal_mask1 | abnormal_mask2 # 获取连续异常值具体数值 removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns} # 剔除异常值 df_clean = df[~abnormal_mask] total_removed = abnormal_mask.sum() return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values def process_csv_files(input_dir, output_dir,M,N): # MBD:没有考虑时间重复 if not os.path.exists(output_dir): os.makedirs(output_dir) for i in arg.turbineloc: input_file = os.path.join(input_dir, f"turbine-{i}.csv") output_file = os.path.join(output_dir, f"turbine-{i}.csv") # 读取csv文件 df = pd.read_csv(input_file) # 剔除异常值,并获取异常值统计信息 df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = remove_abnormal_values(df,N) # 输出异常值统计信息 print(f"处理文件:{input_file}") print(f"剔除 -99 点异常值数量:{count_abnormal1}") print(f"剔除连续异常值数量:{count_abnormal2}") print(f"总共剔除数据量:{total_removed}") print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n") # 保存处理过的CSV文件 df_clean.to_csv(output_file, index=False) #——————————————————————————风机单机时间对齐—————————————————————————————— def TimeMerge(input_dir, output_dir,M): # 读取所有CSV文件 files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in arg.turbineloc] dataframes = [pd.read_csv(f) for f in files] # 获取C_TIME列的交集 c_time_intersection = set(dataframes[0]["C_TIME"]) for df in dataframes[1:]: c_time_intersection.intersection_update(df["C_TIME"]) # 只保留C_TIME交集中的数据 filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes] # 将每个过滤后的DataFrame写入新的CSV文件 os.makedirs(output_dir, exist_ok=True) for (filtered_df, i) in zip(filtered_dataframes, arg.turbineloc): if i == 144: filtered_df['C_ACTIVE_POWER'] /= 1000 filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False) #——————————————————————————风机缺失点处理—————————————————————————————— def MissingPointProcessing(input_dir,output_dir,M,N): # 存储数据的列表 # 读取M个文件 for k in arg.turbineloc: file_name = input_dir + '/' + f"turbine-{k}.csv" # file_name = os.path.join(input_dir, f"turbine-{k}.csv") # 读取CSV文件 data = pd.read_csv(file_name, parse_dates=['C_TIME']) # 计算时间差 data['time_diff'] = data['C_TIME'].diff().dt.total_seconds() # 找出缺失的时间点 missing_data_points = data[data['time_diff'] > 900] # 存储填充的时间和值 filled_data = [] # 输出缺失的开始时刻和数量 print("缺失的开始时刻:") for index, row in missing_data_points.iterrows(): missing_start = row['C_TIME'] - timedelta(seconds=row['time_diff']) missing_count = int(row['time_diff'] // 900) - 1 # 如果缺失的点数小于N个,则进行填充 MBD:填充代码比较啰嗦 if missing_count <= N: prev_values = data.iloc[index - 1][['C_WS', 'C_WD', 'C_ACTIVE_POWER']] next_values = row[['C_WS', 'C_WD', 'C_ACTIVE_POWER']] for i in range(1, missing_count + 1): t = i / (missing_count + 1) filled_time = missing_start + timedelta(minutes=15 * i) filled_values = { 'C_TIME': filled_time, 'C_WS': prev_values['C_WS'] + (next_values['C_WS'] - prev_values['C_WS']) * t, 'C_WD': prev_values['C_WD']+(next_values['C_WD']-prev_values['C_WD'])*t, 'C_ACTIVE_POWER': prev_values['C_ACTIVE_POWER'] + ( next_values['C_ACTIVE_POWER'] - prev_values['C_ACTIVE_POWER']) * t, } # 将角度值限制在-180到180的范围内 filled_values['C_WD'] = (filled_values['C_WD'] + 180) % 360 - 180 filled_data.append(filled_values) print(f"填充的时间: {filled_time}, 填充的值: {filled_values}") print(f"{missing_start} - 缺失的点的数量: {missing_count}") # 将填充的数据插入原始数据中 filled_df = pd.DataFrame(filled_data) data = pd.concat([data, filled_df], ignore_index=True) # 对数据按时间排序并重置索引 data = data.sort_values(by='C_TIME').reset_index(drop=True) # 输出总缺失点数 missing_data_points = data[data['time_diff'] > 900] print(f"总缺失点数: {int(missing_data_points['time_diff'].sum() // 900) - len(missing_data_points)}") data.drop(columns=['time_diff'], inplace=True) os.makedirs(output_dir, exist_ok=True) output_path_name = os.path.join(output_dir, f"turbine-{k}.csv") print(output_path_name) # 保存插值后的文件 data.to_csv(output_path_name, index=False) #——————————————————————————风机单机连续时间段分割—————————————————————————————— def top_n_continuous_periods(data, n): continuous_periods = [] continuous_start = data['C_TIME'].iloc[0] continuous_count = 1 for i in range(1, len(data)): if data['time_diff'].iloc[i] == 900: continuous_count += 1 else: continuous_periods.append({ 'start': continuous_start, 'end': data['C_TIME'].iloc[i - 1], 'count': continuous_count }) continuous_start = data['C_TIME'].iloc[i] continuous_count = 1 continuous_periods.append({ 'start': continuous_start, 'end': data['C_TIME'].iloc[-1], 'count': continuous_count }) continuous_periods.sort(key=lambda x: x['count'], reverse=True) return continuous_periods[:n] def Continuous_Data(input_dir,output_dir,M,TopN): # 读取CSV文件 for k in arg.turbineloc: path_dir = f"turbine-{k}.csv" input_path = os.path.join(input_dir, path_dir) data = pd.read_csv(input_path, parse_dates=['C_TIME']) data = data.sort_values(by='C_TIME').reset_index(drop=True) # 计算时间差 data['time_diff'] = data['C_TIME'].diff().dt.total_seconds() # 获取Top N连续的数据段 top_n = TopN top_n_periods = top_n_continuous_periods(data, top_n) data.drop(columns=['time_diff'], inplace=True) # 输出Top N连续的数据的数量、开始时间和最后的时间 print(f"Top {top_n} 连续的数据段:") for i, period in enumerate(top_n_periods): print(f"{i + 1}. 开始时间: {period['start']} - 结束时间: {period['end']} - 数据量: {period['count']}") output_file = f"turbine-{k}_{period['count']}.csv" mask = (data['C_TIME'] >= period['start']) & (data['C_TIME'] <= period['end']) filtered_df = data.loc[mask] # 更新文件名中的period['count']为数据集大小 output_file = output_file.replace(str(period['count']), str(filtered_df.shape[0])) output_folder = f"Continuous_Turbine_Data_{period['count']}_{period['start'].strftime('%y-%m-%d-%H-%M')}_{period['end'].strftime('%y-%m-%d-%H-%M')}" output_folder = os.path.join(output_dir, output_folder) if not os.path.exists(output_folder): os.makedirs(output_folder) # 保存截取的数据到新的csv文件 # filtered_df.to_csv(output_dir, index=False) filtered_df.to_csv(os.path.join(output_folder, output_file), index=False) print(f"Processed {input_path}") if __name__ == "__main__": import datetime arg = Arg.Arg() db = DataBase(arg=arg) db.data_process() # input_dir = "../data/314/turbine-15" # 输入文件夹路径 # output_dir = "../data/314/output_clean_csv_files" # 输出文件夹路径 # 对机头风速连续异常值和-99进行清洗,第三个参数是连续5个值不变以后就认为异常 # 这步会生成一个"output_clean_csv_files"文件夹,里面包含全部单机的数据,存储的机头风速只清理了-99,参数50是风机数量+1,风机参数5就是连续5个点的认为是异常值,全部剔除。 # process_csv_files(input_dir, output_dir, 50, 5) # output_dir_time_Merge = "../data/314/output_filtered_csv_files" # 这步会生成一个"output_filtered_csv_files"文件夹,在上一步的基础上,对齐了全部风机的时间,只各自保留了交集。 # TimeMerge(output_dir,output_dir_time_Merge,50) # output_complete_data = "../data_mts/complete_data" # 这步会生成一个"complete_data"文件夹,在上一步的基础上,填充了10个时间点之内的缺失。 # MissingPointProcessing(output_dir_time_Merge,output_complete_data,50,10) # continuous_time = "../data_mts/continuous_data" # # 这步会生成一个"Continuous_data"文件夹,在上一步的基础上,取Top10个连续时间段最长的单机数据。 # Continuous_Data(output_complete_data, continuous_time, 50, 10)