liudawei 9 months ago
commit
59fd4afa6b
2 changed files with 244 additions and 0 deletions
  1. 79 0
      data_cleaning.py
  2. 165 0
      turbine_cleaning.py

+ 79 - 0
data_cleaning.py

@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/10/11 11:00
+# file: data_cleaning.py
+# author: David
+# company: shenyang JY
+import numpy as np
+np.random.seed(42)
+
+
+def cleaning(df, name, cols=None, dup=True):
+    print("开始清洗:{}……".format(name))
+    data = df.copy()
+    data = data_column_cleaning(data)
+    if dup:
+        data = rm_duplicated(data)
+    if cols is not None:
+        data = key_field_row_cleaning(data, cols)
+    else:
+        data = interpolation(data)
+    return data
+
+
+def data_column_cleaning(data, clean_value=[-9999.0, -99, -99.0]):
+    """
+    列的清洗
+    :param data:
+    :param clean_value:
+    :return:
+    """
+    cols_pre = data.columns.to_list()
+    for val in clean_value:
+        data = data.replace(val, np.nan)
+    # nan 列超过80% 删除
+    data = data.dropna(axis=1, thresh=len(data) * 0.8)
+    # 删除取值全部相同的列
+    data = data.loc[:, (data != data.iloc[0]).any()]
+    cols_late = data.columns.tolist()
+    if len(cols_pre) > len(cols_late):
+        print("清洗的列有:{}".format(set(cols_pre) - set(cols_late)))
+    return data
+
+
+def interpolation(data):
+    # 剩下的nan进行线性插值
+    data = data.bfill()
+    return data
+
+
+def key_field_row_cleaning(data, cols):
+    """
+    行的重要字段清洗: 过滤含有- 99的数字,过滤空值
+    :param data:
+    :param cols: 指定的字段列表
+    :return:
+    """
+    rows_pre = len(data)
+    for col in cols:
+        data = data[~((data.loc[:, col] < 0) & (data.loc[:, col].astype(str).str.contains('99')))]
+        data = data[~data.loc[:, col].isnull()]
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("清洗的行数有:", rows_pre-rows_late)
+    return data
+
+def rm_duplicated(data):
+    """
+    按照时间去重
+    :param data:
+    :return:
+    """
+    # 按照时间去重
+    rows_pre = len(data)
+    data = data.groupby(by='C_TIME').mean()
+    data.reset_index(inplace=True)
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("时间去重的行数有:", rows_pre - rows_late)
+    return data

+ 165 - 0
turbine_cleaning.py

@@ -0,0 +1,165 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2024/6/21 13:49
+# file: turbine_cleaning.py
+# author: David
+# company: shenyang JY
+import os
+import pandas as pd
+from datetime import timedelta
+
+
+# ——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
+def mark_abnormal_streaks(df, columns, min_streak):
+    abnormal_mask = pd.Series(False, index=df.index)
+    streak_start = None
+    for i in range(len(df)):
+        if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
+            streak_start = i
+
+        if i - streak_start >= min_streak - 1:
+            abnormal_mask[i - min_streak + 1:i + 1] = True
+
+    return abnormal_mask
+
+
+def remove_abnormal_values(df, N):
+    # 标记C_ACTIVE_POWER为-99的行为异常值
+    abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
+    count_abnormal1 = abnormal_mask1.sum()
+
+    # 标记C_WS, A, B连续5行不变的行为异常值
+    columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
+    abnormal_mask2 = mark_abnormal_streaks(df, columns, N)
+    count_abnormal2 = abnormal_mask2.sum()
+    # 获得所有异常值的布尔掩码
+    abnormal_mask = abnormal_mask1 | abnormal_mask2
+    # 获取连续异常值具体数值
+    removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
+    # 剔除异常值
+    df_clean = df[~abnormal_mask]
+    total_removed = abnormal_mask.sum()
+    return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
+
+
+def process_csv_files(input_dir, output_dir, turbines_id, M,N):  # MBD:没有考虑时间重复
+    if not os.path.exists(output_dir):
+        os.makedirs(output_dir)
+
+    for i in turbines_id:
+        input_file = os.path.join(input_dir, f"turbine-{i}.csv")
+        output_file = os.path.join(output_dir, f"turbine-{i}.csv")
+
+        # 读取csv文件
+        df = pd.read_csv(input_file)
+
+        # 剔除异常值,并获取异常值统计信息
+        df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = remove_abnormal_values(df,N)
+
+        # 输出异常值统计信息
+        print(f"处理文件:{input_file}")
+        print(f"剔除 -99 点异常值数量:{count_abnormal1}")
+        print(f"剔除连续异常值数量:{count_abnormal2}")
+        print(f"总共剔除数据量:{total_removed}")
+        print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
+
+        # 保存处理过的CSV文件
+        df_clean.to_csv(output_file, index=False)
+
+
+# ——————————————————————————风机单机时间对齐——————————————————————————————
+def TimeMerge(input_dir, output_dir, turbines_id, M):
+    # 读取所有CSV文件
+    files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in turbines_id]
+    dataframes = [pd.read_csv(f) for f in files]
+
+    # 获取C_TIME列的交集
+    c_time_intersection = set(dataframes[0]["C_TIME"])
+    for df in dataframes[1:]:
+        c_time_intersection.intersection_update(df["C_TIME"])
+
+    # 只保留C_TIME交集中的数据
+    filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
+
+    # 将每个过滤后的DataFrame写入新的CSV文件
+    os.makedirs(output_dir, exist_ok=True)
+    turbines_all, names = [], ['C_TIME']
+    for (filtered_df, i) in zip(filtered_dataframes, turbines_id):
+        # if i == 144:
+        #     filtered_df['C_ACTIVE_POWER'] /= 1000
+        filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
+        names.append('C_ACTIVE_POWER_{}'.format(i))
+        turbines_all.append(filtered_df['C_ACTIVE_POWER'].reset_index(drop=True))
+    turbines_all.insert(0, filtered_dataframes[0]['C_TIME'].reset_index(drop=True))
+    turbines_all = pd.concat(turbines_all, axis=1)
+    turbines_all.columns = names
+    turbines_all.to_csv(os.path.join(output_dir, f"turbines.csv"), index=False)
+
+
+# ——————————————————————————风机缺失点处理——————————————————————————————
+def MissingPointProcessing(input_dir,output_dir, turbines_id, M,N):
+
+    # 存储数据的列表
+
+    # 读取M个文件
+    for k in turbines_id:
+        file_name = input_dir + '/' + f"turbine-{k}.csv"
+        # file_name = os.path.join(input_dir, f"turbine-{k}.csv")
+    # 读取CSV文件
+        data = pd.read_csv(file_name, parse_dates=['C_TIME'])
+
+    # 计算时间差
+        data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
+
+    # 找出缺失的时间点
+        missing_data_points = data[data['time_diff'] > 900]
+
+    # 存储填充的时间和值
+        filled_data = []
+
+    # 输出缺失的开始时刻和数量
+        print("缺失的开始时刻:")
+        for index, row in missing_data_points.iterrows():
+            missing_start = row['C_TIME'] - timedelta(seconds=row['time_diff'])
+            missing_count = int(row['time_diff'] // 900) - 1
+
+            # 如果缺失的点数小于N个,则进行填充 MBD:填充代码比较啰嗦
+            if missing_count <= N:
+                prev_values = data.iloc[index - 1][['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
+                next_values = row[['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
+
+                for i in range(1, missing_count + 1):
+                    t = i / (missing_count + 1)
+                    filled_time = missing_start + timedelta(minutes=15 * i)
+
+                    filled_values = {
+                        'C_TIME': filled_time,
+                        'C_WS': prev_values['C_WS'] + (next_values['C_WS'] - prev_values['C_WS']) * t,
+                        'C_WD': prev_values['C_WD']+(next_values['C_WD']-prev_values['C_WD'])*t,
+                        'C_ACTIVE_POWER': prev_values['C_ACTIVE_POWER'] + (
+                                    next_values['C_ACTIVE_POWER'] - prev_values['C_ACTIVE_POWER']) * t,
+                    }
+
+                    # 将角度值限制在-180到180的范围内
+                    filled_values['C_WD'] = (filled_values['C_WD'] + 180) % 360 - 180
+
+                    filled_data.append(filled_values)
+                    print(f"填充的时间: {filled_time}, 填充的值: {filled_values}")
+
+            print(f"{missing_start} - 缺失的点的数量: {missing_count}")
+
+        # 将填充的数据插入原始数据中
+        filled_df = pd.DataFrame(filled_data)
+        data = pd.concat([data, filled_df], ignore_index=True)
+        # 对数据按时间排序并重置索引
+        data = data.sort_values(by='C_TIME').reset_index(drop=True)
+
+        # 输出总缺失点数
+        missing_data_points = data[data['time_diff'] > 900]
+        print(f"总缺失点数: {int(missing_data_points['time_diff'].sum() // 900) - len(missing_data_points)}")
+        data.drop(columns=['time_diff'], inplace=True)
+        os.makedirs(output_dir, exist_ok=True)
+        output_path_name = os.path.join(output_dir, f"turbine-{k}.csv")
+        print(output_path_name)
+    # 保存插值后的文件
+        data.to_csv(output_path_name, index=False)