liudawei 2 vuotta sitten
commit
5140303ed1
6 muutettua tiedostoa jossa 725 lisäystä ja 0 poistoa
  1. 12 0
      db-wind/README.md
  2. 361 0
      db-wind/getdata/inputData.py
  3. 80 0
      db-wind/getdata/splitdata.py
  4. 221 0
      db-wind/main.py
  5. 12 0
      db-wind/utils/Arg.py
  6. 39 0
      db-wind/utils/savedata.py

+ 12 - 0
db-wind/README.md

@@ -0,0 +1,12 @@
+# 操作方法
+- 运行main.py就行
+
+主要功能是读取数据库的相关数据保存为本地excel,并进行一些简单的数据处理。
+主要包括的函数有:
+
+- clear_data():清空data的所有.csv文件,前提是需要运行一遍代码创建文件
+- get_process_NWP(database): 从数据库中获取NWP数据,并进行简单处理
+- get_process_turbine(database):从数据库中获取风头数据,并进行简单处理
+- get_process_tower(database):获取测风塔数据
+- get_process_power(database):获取整体功率数据
+- indep_process():进一步数据处理:时间统一处理等

+ 361 - 0
db-wind/getdata/inputData.py

@@ -0,0 +1,361 @@
+import pymysql
+import pandas as pd
+import numpy as np
+from sqlalchemy import create_engine
+import matplotlib.pyplot as plt
+import pytz
+plt.rcParams['font.sans-serif'] = ['SimHei']
+import utils.savedata
+from utils import Arg
+arg = Arg.Arg()
+
+def clear_data():
+    """
+    删除所有csv
+    :return:
+    """
+    # 设置文件夹路径
+    import glob
+    import os
+    folder_path = arg.dataloc
+
+    # 使用 glob 获取所有的 .csv 文件路径
+    csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
+
+    # 遍历所有 .csv 文件并删除
+    for file_path in csv_files:
+        os.remove(file_path)
+    print("清除所有scv文件")
+
+def create_database(database):
+    """
+    创建数据库连接
+    :param database: 数据库地址
+    :return:
+    """
+    engine = create_engine(database)
+    return engine
+
+def exec_sql(sql,engine):
+    """
+    从数据库获取数据
+    :param sql: sql语句
+    :param engine: 数据库对象
+    :return:
+    """
+    df = pd.read_sql_query(sql, engine)
+    return df
+
+def get_process_NWP(database):
+    """
+    从数据库中获取NWP数据,并进行简单处理
+    :param database:
+    :return:
+    """
+    # NPW数据
+    engine = create_database(database)
+    sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp"
+    NWP = exec_sql(sql_NWP, engine)
+
+    #删除后三位
+    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(str)
+    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].str[:-3]
+
+    # 将 'timestamp' 列转换为日期时间格式
+    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(float)
+    NWP['C_PRE_TIME'] = pd.to_datetime(NWP['C_PRE_TIME'], unit='s')
+
+    # 将日期时间转换为本地时区
+    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.tz_localize(pytz.utc).dt.tz_convert('Asia/Shanghai')
+
+    # 格式化日期时间为年月日时分秒
+    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+
+    NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
+
+    utils.savedata.saveData("NWP.csv",NWP)
+    return NWP
+
+def get_process_turbine(database):
+    """
+    从数据库中获取风头数据,并进行简单处理
+    :param database:
+    :return:
+    """
+
+    # 获取NWP数据
+    NWP = utils.savedata.readData("NWP.csv")
+    NWP_date = NWP.iloc[:,0]
+    print(NWP_date)
+
+    # 机头数据
+    engine = create_database(database)
+    for i in arg.turbineloc:
+        print("导出风机{}的数据".format(i))
+        sql_turbine = "select C_TIME,C_WS,C_WD,C_ACTIVE_POWER from t_wind_turbine_status_data WHERE C_EQUIPMENT_NO=" + str(i) #+ " and C_WS>0 and C_ACTIVE_POWER>0"
+        turbine = exec_sql(sql_turbine, engine)
+
+        #直接导出所有数据
+        utils.savedata.saveData("turbine-all/turbine-{}.csv".format(i), turbine)
+
+        #每15分钟导出一个数据
+        filtered_df = turbine[turbine['C_TIME'].isin(NWP_date)]
+        utils.savedata.saveData("turbine-15/turbine-{}.csv".format(i), filtered_df)
+
+def get_process_tower(database):
+    """
+    获取测风塔数据
+    :param database:
+    :return:
+    """
+    engine = create_database(database)
+    print("现有测风塔:{}".format(arg.towerloc))
+    for i in arg.towerloc:
+        print("测风塔{}导出数据".format(i))
+        # 删除没用的列
+        drop_colmns = ["C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
+
+        get_colmns = []
+        # 查询表的所有列名
+        result_set = exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
+        for name in result_set.iloc[:,0]:
+            if name not in drop_colmns:
+                get_colmns.append(name)
+
+        all_columns_str = ", ".join([f'{col}' for col in get_colmns])
+
+        tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i)
+        tower = exec_sql(tower_sql, engine)
+        utils.savedata.saveData("tower/tower-{}.csv".format(i), tower)
+
+def get_process_power(database):
+    """
+    获取整体功率数据
+    :param database:
+    :return:
+    """
+    engine = create_database(database)
+    sql_power = "select C_TIME,C_REAL_VALUE from t_power_station_status_data"
+    power = exec_sql(sql_power, engine)
+    utils.savedata.saveData("power.csv", power)
+
+def get_turbine_info(database):
+    """
+    获取风机信息
+    :param database:
+    :return:
+    """
+    engine = create_engine(database)
+    sql_turbine = "select C_ID, C_LATITUDE as '纬度', C_LONGITUDE as '经度', C_HUB_HEIGHT as '轮毂高度' from t_wind_turbine_info"
+    turbine_info = exec_sql(sql_turbine, engine)
+    utils.savedata.saveData("风机信息.csv", turbine_info)
+
+def indep_process():
+    """
+    进一步数据处理:时间统一处理等
+    :return:
+    """
+    # 测风塔数据处理
+    for i in arg.towerloc:
+        tower = utils.savedata.readData("/tower/tower-{}.csv".format(i))
+        # 判断每一列是否全是 -99
+        all_minus_99 = (tower == -99).all()
+
+        # 获取全是 -99 的列的列名
+        cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist()
+
+        # 使用 drop() 方法删除列  MBD: 有一部分是-99的列没处理
+        tower = tower.drop(cols_to_drop, axis=1)
+        utils.savedata.saveData("/tower/tower-{}-process.csv".format(i), tower)
+
+    # 测风塔时间统一
+    tower1 = utils.savedata.readData("/tower/tower-{}-process.csv".format(1))
+    # tower2 = utils.savedata.readData("/tower/tower-{}-process.csv".format(2))
+    # tower1 = tower1[tower1['C_TIME'].isin(tower2['C_TIME'])]
+    # tower2 = tower2[tower2['C_TIME'].isin(tower1['C_TIME'])]
+
+    utils.savedata.saveData("/tower/tower-{}-process.csv".format(1), tower1)
+    # utils.savedata.saveData("/tower/tower-{}-process.csv".format(2), tower2)
+
+    # 所有表时间统一 MBD: 没有power和tower的统一
+    filenames = ["/NWP.csv","/power.csv", '/tower/tower-1-process.csv']
+    dataframes = []
+    for i in arg.turbineloc:
+        filenames.append("/turbine-15/turbine-{}.csv".format(i))
+    for name in filenames:
+        dataframes.append(utils.savedata.readData(name))
+
+    # 查找最大起始时间和最小结束时间
+    max_start_time = max(df['C_TIME'].min() for df in dataframes)
+    min_end_time = min(df['C_TIME'].max() for df in dataframes)
+
+    print(max_start_time)
+    print(min_end_time)
+
+    # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
+    for i, df in enumerate(dataframes):
+        df['C_TIME'] = pd.to_datetime(df['C_TIME'])  # 确保时间列是 datetime 类型
+        df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
+
+        # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
+        utils.savedata.saveData(filenames[i],df_filtered)
+
+def NWP_indep_process():
+    """
+    将NWP数据按照缺失值数量划分为N个不同数据集
+    :return:
+    """
+    # NWP数据进一步处理
+    NWP = utils.savedata.readData("NWP.csv")
+    df = pd.to_datetime(NWP['C_TIME'])
+    time_diff = df.diff()
+    time_diff_threshold = pd.Timedelta(minutes=15)
+    missing_values = df[time_diff > time_diff_threshold]
+    print("NWP数据缺失的数量为:{}".format(len(missing_values)))
+    print(missing_values)
+
+    # 文件保存
+    utils.savedata.saveVar("NWP_miss.pickle", missing_values)
+    split_indices = []
+    for i in range(len(missing_values)):
+        if i == 0:
+            split_indices.append((0, missing_values.index[i]))
+        else:
+            split_indices.append((missing_values.index[i - 1], missing_values.index[i]))  # MBD:分割这块有问题
+    split_indices.append((missing_values.index[-1], len(df)))  # MBD: 刘大为改
+    split_datasets = [NWP.iloc[start:end,:] for start, end in split_indices]
+    for i, split_df in enumerate(split_datasets):
+        utils.savedata.saveData("Dataset_training/NWP/NWP_{}.csv".format(i),split_df)
+
+    return split_datasets
+
+
+# def power_indep_process():
+#     NWP = utils.savedata.readData("power.csv")
+
+def Data_split():
+    """
+    这个函数没用上,可以不看
+    :return:
+    """
+    NWP = utils.savedata.readData("power_15min.csv")
+    df = pd.to_datetime(NWP['C_TIME'])
+    time_diff = df.diff()
+    time_diff_threshold = pd.Timedelta(minutes=15)
+    missing_values = df[time_diff > time_diff_threshold]
+    print("NWP数据缺失的数量为:{}".format(len(missing_values)))
+    print(missing_values)
+    NWP_miss = utils.savedata.readVar("NWP_miss.pickle")
+
+    for t in missing_values.index:
+        a = t-1
+        b = t
+        time1 = NWP['C_TIME'][a]
+        time2 = NWP['C_TIME'][b]
+        df = pd.to_datetime([time1, time2])
+        # 计算时间差
+        time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
+        print(time_diff)
+
+    time1 = "2022-10-27 14:00:00"
+    time2 = "2023-04-16 12:00:00"
+    df = pd.to_datetime([time1, time2])
+    # 计算时间差
+    time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
+    print(time_diff)
+
+def time_all_in():
+    """
+    这个函数暂时没用上,这个函数的目的是给机头数据进行填充,找到时间缺失的位置,填充为-99
+    :return:
+    """
+    filenames = []
+    dataframes = []
+    for i in arg.turbineloc:
+        filenames.append("/turbine-15/turbine-{}.csv".format(i))
+    for name in filenames:
+        dataframes.append(utils.savedata.readData(name))
+
+    for df in dataframes:
+        df['C_TIME'] = pd.to_datetime(df['C_TIME'])
+
+        # 创建一个完整的时间序列索引,包括所有可能的时间点
+        start_time = df['C_TIME'].min()
+        end_time = df['C_TIME'].max()
+        full_time_range = pd.date_range(start_time, end_time, freq='15min')
+
+        # 使用完整的时间序列索引创建一个空的 DataFrame
+        full_df = pd.DataFrame(index=full_time_range)
+        full_df.index.name = 'C_TIME'
+
+        # 将原始数据与空的 DataFrame 合并
+        merged_df = full_df.merge(df, how='left', left_on='time', right_on='time')
+
+        # 使用 -99 填充缺失值,除了时间列
+        merged_df.fillna(-99, inplace=True)
+        merged_df.reset_index(inplace=True)
+
+
+
+def data_process(database):
+    """
+    数据导出+初步处理的总操控代码
+    :param database:
+    :return:
+    """
+    # clear_data()
+    # get_process_NWP(database)
+    # get_process_turbine(database)
+    # get_turbine_info(database)
+    # get_process_tower(database)
+    # get_process_power(database)
+    # indep_process()
+    NWP_indep_process()
+    #Data_split()
+
+if __name__ == '__main__':
+
+    import os
+    import glob
+    # 设置文件夹路径
+    folder_path = '../data'
+
+    # 使用 glob 获取所有的 .csv 文件
+    csv_files = glob.glob(os.path.join(folder_path, '*.csv'))
+
+    # 遍历所有 .csv 文件并删除
+    for file_path in csv_files:
+        os.remove(file_path)
+    # database = "mysql+pymysql://root:!QAZ2root@192.168.1.205:3306/ipfcst-sishui-a"
+    # engine = create_database(database)
+    #
+    # # NPW数据
+    # sql_NWP = "select C_SC_DATE,C_SC_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp"
+    # NWP = exec_sql(sql_NWP,engine)
+    #
+    # # 分风机功率
+    # sql_wind = "select C_WS,C_ACTIVE_POWER from t_wind_turbine_status_data_15 WHERE C_EQUIPMENT_NO=2 and C_WS>0 and C_ACTIVE_POWER>0  "
+    # df_wind = exec_sql(sql_wind,engine)
+    # print(df_wind)
+
+    # 总功率数据读取
+    # sql_power = "select * from t_power_station_status_data"
+    # df_power = exec_sql(sql_power, engine)
+    filenames = []
+    dataframes = []
+    for i in arg.turbineloc:
+        filenames.append("../data/turbine-15/turbine-{}.csv".format(i))
+    for name in filenames:
+        dataframes.append(pd.read_csv(name).iloc[:7000,:])
+
+    # for df in  enumerate(dataframes):
+    #     df =
+    mean_of_first_columns = pd.concat([df['C_WS'] for df in dataframes], axis=1).mean(axis=1)
+    mean_of_second_columns = (pd.concat([df['C_ACTIVE_POWER'] for df in dataframes], axis=1).sum(axis=1)/1000).astype(int)
+    print(len(mean_of_first_columns))
+
+
+    plt.scatter(mean_of_first_columns, mean_of_second_columns)
+    plt.show()
+
+

+ 80 - 0
db-wind/getdata/splitdata.py

@@ -0,0 +1,80 @@
+import pandas as pd
+import utils.savedata
+
+
+def a():
+    # 读取大文件
+    # large_file = "power_15min.csv"
+    # large_df = utils.savedata.readData(large_file)
+    #
+    # # 读取每个小文件并将其存储在一个字典中
+    # small_files = []
+    # for i in range(6):
+    #     small_files.append("/Dataset_training/NWP/NWP_{}.csv".format(i))
+    # small_dfs = {}
+    #
+    # for file in small_files:
+    #     small_dfs[file] = utils.savedata.readData(file)
+    #
+    # # 根据每个小文件的时间范围拆分大文件
+    # i = 0
+    # for file, df in small_dfs.items():
+    #     min_time = df["C_TIME"].min()
+    #     max_time = df["C_TIME"].max()
+    #     splitted_df = large_df[(large_df["C_TIME"] >= min_time) & (large_df["C_TIME"] <= max_time)]
+    #     utils.savedata.saveData("/Dataset_training/power/power_{}.csv".format(i), splitted_df)
+    #     i = i + 1
+
+    filenames = ["Dataset_training/NWP/NWP_0.csv","Dataset_training/power/power_0.csv"]
+    dataframes = []
+    for name in filenames:
+        dataframes.append(utils.savedata.readData(name))
+
+    # 查找最大起始时间和最小结束时间
+    max_start_time = max(df['C_TIME'].min() for df in dataframes)
+    min_end_time = min(df['C_TIME'].max() for df in dataframes)
+
+    print(max_start_time)
+    print(min_end_time)
+
+    # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
+    for i, df in enumerate(dataframes):
+        df['C_TIME'] = pd.to_datetime(df['C_TIME'])  # 确保时间列是 datetime 类型
+        df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
+
+        # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
+        utils.savedata.saveData(filenames[i],df_filtered)
+
+def split_test():
+    # 读取 NWP_5.csv 和 power_5.csv 文件
+    nwp_df = utils.savedata.readData("Dataset_training/NWP/NWP_0.csv")
+    power_df = utils.savedata.readData("Dataset_training/power/power_0.csv")
+
+    small_files = []
+    for i in range(6):
+        small_files.append("/Dataset_training/NWP/NWP_{}.csv".format(i))
+    for i in range(6):
+        small_files.append("/Dataset_training/power/power_{}.csv".format(i))
+    dataframes = []
+    for name in small_files:
+        dataframes.append(utils.savedata.readData(name))
+    small_dfs = {}
+    l = 0
+    for df in dataframes:
+        l = l + len(df)
+    test_size = int(l* 0.1)
+
+    nwp_test = nwp_df.iloc[-test_size:]
+    power_test = power_df.iloc[-test_size:]
+
+    nwp_train = nwp_df[~nwp_df["C_TIME"].isin(nwp_test["C_TIME"])]
+    power_train = power_df[~power_df["C_TIME"].isin(power_test["C_TIME"])]
+
+    utils.savedata.saveData("/Dataset_test/NWP/NWP_test.csv", nwp_test)
+    utils.savedata.saveData("/Dataset_test/power/power_test.csv", power_test)
+
+    utils.savedata.saveData("/Dataset_training/NWP/NWP_5.csv", nwp_train)
+    utils.savedata.saveData("/Dataset_training/power/power_5.csv", power_train)
+if __name__ == '__main__':
+    a()
+    #split_test()

+ 221 - 0
db-wind/main.py

@@ -0,0 +1,221 @@
+import pandas as pd
+import os
+from datetime import timedelta
+from getdata import inputData
+from utils import Arg
+#——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
+def mark_abnormal_streaks(df, columns, min_streak):
+    abnormal_mask = pd.Series(False, index=df.index)
+    streak_start = None
+    for i in range(len(df)):
+        if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
+            streak_start = i
+
+        if i - streak_start >= min_streak - 1:
+            abnormal_mask[i - min_streak + 1:i + 1] = True
+
+    return abnormal_mask
+
+
+def remove_abnormal_values(df, N):
+    # 标记C_ACTIVE_POWER为-99的行为异常值
+    abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
+    count_abnormal1 = abnormal_mask1.sum()
+
+    # 标记C_WS, A, B连续5行不变的行为异常值
+    columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
+    abnormal_mask2 = mark_abnormal_streaks(df, columns, N)
+    count_abnormal2 = abnormal_mask2.sum()
+    # 获得所有异常值的布尔掩码
+    abnormal_mask = abnormal_mask1 | abnormal_mask2
+    # 获取连续异常值具体数值
+    removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
+    # 剔除异常值
+    df_clean = df[~abnormal_mask]
+    total_removed = abnormal_mask.sum()
+    return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
+
+
+def process_csv_files(input_dir, output_dir,M,N):  # MBD:没有考虑时间重复
+    if not os.path.exists(output_dir):
+        os.makedirs(output_dir)
+
+    for i in arg.turbineloc:
+        input_file = os.path.join(input_dir, f"turbine-{i}.csv")
+        output_file = os.path.join(output_dir, f"turbine-{i}.csv")
+
+        # 读取csv文件
+        df = pd.read_csv(input_file)
+
+        # 剔除异常值,并获取异常值统计信息
+        df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = remove_abnormal_values(df,N)
+
+        # 输出异常值统计信息
+        print(f"处理文件:{input_file}")
+        print(f"剔除 -99 点异常值数量:{count_abnormal1}")
+        print(f"剔除连续异常值数量:{count_abnormal2}")
+        print(f"总共剔除数据量:{total_removed}")
+        print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
+
+        # 保存处理过的CSV文件
+        df_clean.to_csv(output_file, index=False)
+#——————————————————————————风机单机时间对齐——————————————————————————————
+def TimeMerge(input_dir, output_dir,M):
+    # 读取所有CSV文件
+    files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in arg.turbineloc]
+    dataframes = [pd.read_csv(f) for f in files]
+
+    # 获取C_TIME列的交集
+    c_time_intersection = set(dataframes[0]["C_TIME"])
+    for df in dataframes[1:]:
+        c_time_intersection.intersection_update(df["C_TIME"])
+
+    # 只保留C_TIME交集中的数据
+    filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
+
+    # 将每个过滤后的DataFrame写入新的CSV文件
+    os.makedirs(output_dir, exist_ok=True)
+    for (filtered_df, i) in zip(filtered_dataframes, arg.turbineloc):
+        filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
+
+#——————————————————————————风机缺失点处理——————————————————————————————
+def MissingPointProcessing(input_dir,output_dir,M,N):
+
+    # 存储数据的列表
+
+    # 读取M个文件
+    for k in arg.turbineloc:
+        file_name = input_dir + '/' + f"turbine-{k}.csv"
+        # file_name = os.path.join(input_dir, f"turbine-{k}.csv")
+    # 读取CSV文件
+        data = pd.read_csv(file_name, parse_dates=['C_TIME'])
+
+    # 计算时间差
+        data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
+
+    # 找出缺失的时间点
+        missing_data_points = data[data['time_diff'] > 900]
+
+    # 存储填充的时间和值
+        filled_data = []
+
+    # 输出缺失的开始时刻和数量
+        print("缺失的开始时刻:")
+        for index, row in missing_data_points.iterrows():
+            missing_start = row['C_TIME'] - timedelta(seconds=row['time_diff'])
+            missing_count = int(row['time_diff'] // 900) - 1
+
+            # 如果缺失的点数小于N个,则进行填充 MBD:填充代码比较啰嗦
+            if missing_count <= N:
+                prev_values = data.iloc[index - 1][['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
+                next_values = row[['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
+
+                for i in range(1, missing_count + 1):
+                    t = i / (missing_count + 1)
+                    filled_time = missing_start + timedelta(minutes=15 * i)
+
+                    filled_values = {
+                        'C_TIME': filled_time,
+                        'C_WS': prev_values['C_WS'] + (next_values['C_WS'] - prev_values['C_WS']) * t,
+                        'C_WD': prev_values['C_WD']+(next_values['C_WD']-prev_values['C_WD'])*t,
+                        'C_ACTIVE_POWER': prev_values['C_ACTIVE_POWER'] + (
+                                    next_values['C_ACTIVE_POWER'] - prev_values['C_ACTIVE_POWER']) * t,
+                    }
+
+                    # 将角度值限制在-180到180的范围内
+                    filled_values['C_WD'] = (filled_values['C_WD'] + 180) % 360 - 180
+
+                    filled_data.append(filled_values)
+                    print(f"填充的时间: {filled_time}, 填充的值: {filled_values}")
+
+            print(f"{missing_start} - 缺失的点的数量: {missing_count}")
+
+        # 将填充的数据插入原始数据中
+        filled_df = pd.DataFrame(filled_data)
+        data = pd.concat([data, filled_df], ignore_index=True)
+        # 对数据按时间排序并重置索引
+        data = data.sort_values(by='C_TIME').reset_index(drop=True)
+
+        # 输出总缺失点数
+        missing_data_points = data[data['time_diff'] > 900]
+        print(f"总缺失点数: {int(missing_data_points['time_diff'].sum() // 900) - len(missing_data_points)}")
+        data.drop(columns=['time_diff'], inplace=True)
+        os.makedirs(output_dir, exist_ok=True)
+        output_path_name = os.path.join(output_dir, f"turbine-{k}.csv")
+        print(output_path_name)
+    # 保存插值后的文件
+        data.to_csv(output_path_name, index=False)
+#——————————————————————————风机单机连续时间段分割——————————————————————————————
+def top_n_continuous_periods(data, n):
+    continuous_periods = []
+    continuous_start = data['C_TIME'].iloc[0]
+    continuous_count = 1
+
+    for i in range(1, len(data)):
+        if data['time_diff'].iloc[i] == 900:
+            continuous_count += 1
+        else:
+            continuous_periods.append({
+                'start': continuous_start,
+                'end': data['C_TIME'].iloc[i - 1],
+                'count': continuous_count
+            })
+            continuous_start = data['C_TIME'].iloc[i]
+            continuous_count = 1
+
+    continuous_periods.append({
+        'start': continuous_start,
+        'end': data['C_TIME'].iloc[-1],
+        'count': continuous_count
+    })
+    continuous_periods.sort(key=lambda x: x['count'], reverse=True)
+    return continuous_periods[:n]
+def Continuous_Data(input_dir,output_dir,M,TopN):
+    # 读取CSV文件
+    for k in arg.turbineloc:
+        path_dir = f"turbine-{k}.csv"
+        input_path = os.path.join(input_dir, path_dir)
+        data = pd.read_csv(input_path, parse_dates=['C_TIME'])
+        data = data.sort_values(by='C_TIME').reset_index(drop=True)
+    # 计算时间差
+        data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
+    # 获取Top N连续的数据段
+        top_n = TopN
+        top_n_periods = top_n_continuous_periods(data, top_n)
+        data.drop(columns=['time_diff'], inplace=True)
+    # 输出Top N连续的数据的数量、开始时间和最后的时间
+        print(f"Top {top_n} 连续的数据段:")
+        for i, period in enumerate(top_n_periods):
+            print(f"{i + 1}. 开始时间: {period['start']} - 结束时间: {period['end']} - 数据量: {period['count']}")
+            output_file = f"turbine-{k}_{period['count']}.csv"
+            mask = (data['C_TIME'] >= period['start']) & (data['C_TIME'] <= period['end'])
+            filtered_df = data.loc[mask]
+            # 更新文件名中的period['count']为数据集大小
+            output_file = output_file.replace(str(period['count']), str(filtered_df.shape[0]))
+            output_folder = f"Continuous_Turbine_Data_{period['count']}_{period['start'].strftime('%y-%m-%d-%H-%M')}_{period['end'].strftime('%y-%m-%d-%H-%M')}"
+            output_folder = os.path.join(output_dir, output_folder)
+            if not os.path.exists(output_folder):
+                os.makedirs(output_folder)
+            # 保存截取的数据到新的csv文件
+            # filtered_df.to_csv(output_dir, index=False)
+            filtered_df.to_csv(os.path.join(output_folder, output_file), index=False)
+            print(f"Processed {input_path}")
+
+
+if __name__ == "__main__":
+    arg = Arg.Arg()
+    # inputData.data_process(arg.database)
+    input_dir = "../data/turbine-15"  # 输入文件夹路径
+    output_dir = "../data/output_clean_csv_files"  # 输出文件夹路径
+    # 对机头风速连续异常值和-99进行清洗,第三个参数是连续5个值不变以后就认为异常
+    # 这步会生成一个"output_clean_csv_files"文件夹,里面包含全部单机的数据,存储的机头风速只清理了-99,参数50是风机数量+1,风机参数5就是连续5个点的认为是异常值,全部剔除。
+    # process_csv_files(input_dir, output_dir, 50, 5)
+    output_dir_time_Merge = "../data/output_filtered_csv_files"
+    # 这步会生成一个"output_filtered_csv_files"文件夹,在上一步的基础上,对齐了全部风机的时间,只各自保留了交集。
+    # TimeMerge(output_dir,output_dir_time_Merge,50)
+    output_complete_data = "../data/complete_data"
+    # 这步会生成一个"complete_data"文件夹,在上一步的基础上,填充了10个时间点之内的缺失。
+    # MissingPointProcessing(output_dir_time_Merge,output_complete_data,50,10)
+    continuous_time = "continuous_data"
+    # 这步会生成一个"Continuous_data"文件夹,在上一步的基础上,取Top10个连续时间段最长的单机数据。
+    Continuous_Data(output_complete_data, continuous_time, 50, 10)

+ 12 - 0
db-wind/utils/Arg.py

@@ -0,0 +1,12 @@
+class Arg:
+    def __init__(self):
+        # 数据库地址
+        self.database = "mysql+pymysql://root:!QAZ2root@192.168.1.205:3306/ipfcst-laodian"
+        # 数据存放位置
+        self.dataloc = "../data/"
+        # 变量存放位置
+        self.varloc = "../data/var/"
+        # 测风塔个数
+        self.towerloc = [1]
+        # 机头编号
+        self.turbineloc = [i for i in range(102, 162)]

+ 39 - 0
db-wind/utils/savedata.py

@@ -0,0 +1,39 @@
+import pickle
+import pandas as pd
+from utils import Arg
+import os
+
+arg = Arg.Arg()
+
+def saveData(name,data):
+    """
+    存放数据
+    :param name: 名字
+    :param data: 数据
+    :return:
+    """
+    path = arg.dataloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    data.to_csv(path, index=False)
+
+def readData(name):
+    """
+    读取数据
+    :param name: 名字
+    :return:
+    """
+    path = arg.dataloc + r"/" + name
+    return pd.read_csv(path)
+
+def saveVar(name,data):
+    path = arg.varloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    with open(path, 'wb') as file:
+        pickle.dump(data, file)
+
+def readVar(name):
+    path = arg.varloc + r"/" + name
+    with open(path, "rb") as file:
+        split_NWP = pickle.load(file)
+    return split_NWP
+