David 1 tháng trước cách đây
commit
ee328d59c1

+ 10 - 0
.gitignore

@@ -0,0 +1,10 @@
+*.csv
+*.xls
+*.xlsx
+ipynb_checkpoints/**
+**/__pycache__/
+*.py[cod]
+*$py.class
+*.zip
+**/.ipynb_checkpoints/
+.idea

+ 12 - 0
DataBase/db-light/README.md

@@ -0,0 +1,12 @@
+# 操作方法
+- 运行main.py就行
+
+主要功能是读取数据库的相关数据保存为本地excel,并进行一些简单的数据处理。
+主要包括的函数有:
+
+- clear_data():清空data的所有.csv文件,前提是需要运行一遍代码创建文件
+- get_process_NWP(database): 从数据库中获取NWP数据,并进行简单处理
+- get_process_turbine(database):从数据库中获取风头数据,并进行简单处理
+- get_process_tower(database):获取测风塔数据
+- get_process_power(database):获取整体功率数据
+- indep_process():进一步数据处理:时间统一处理等

+ 77 - 0
DataBase/db-light/getdata/data_cleaning.py

@@ -0,0 +1,77 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/10/11 11:00
+# file: data_cleaning.py
+# author: David
+# company: shenyang JY
+import numpy as np
+
+
+def cleaning(df, name, cols=None, dup=True):
+    print("开始清洗:{}……".format(name))
+    data = df.copy()
+    data = data_column_cleaning(data)
+    if dup:
+        data = rm_duplicated(data)
+    if cols is not None:
+        data = key_field_row_cleaning(data, cols)
+    else:
+        data = interpolation(data)
+    return data
+
+def data_column_cleaning(data, clean_value=[-9999.0, -99, -99.0]):
+    """
+    列的清洗
+    :param data:
+    :param clean_value:
+    :return:
+    """
+    cols_pre = data.columns.to_list()
+    for val in clean_value:
+        data = data.replace(val, np.nan)
+    # nan 列超过80% 删除
+    data = data.dropna(axis=1, thresh=len(data) * 0.8)
+    # 删除取值全部相同的列
+    data = data.loc[:, (data != data.iloc[0]).any()]
+    cols_late = data.columns.tolist()
+    if len(cols_pre) > len(cols_late):
+        print("清洗的列有:{}".format(set(cols_pre) - set(cols_late)))
+    return data
+
+
+def interpolation(data):
+    # 剩下的nan进行线性插值
+    data = data.bfill()
+    return data
+
+
+def key_field_row_cleaning(data, cols):
+    """
+    行的重要字段清洗: 过滤含有- 99的数字,过滤空值
+    :param data:
+    :param cols: 指定的字段列表
+    :return:
+    """
+    rows_pre = len(data)
+    for col in cols:
+        data = data[~((data.loc[:, col] < 0) & (data.loc[:, col].astype(str).str.contains('99')))]
+        data = data[~data.loc[:, col].isnull()]
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("清洗的行数有:", rows_pre-rows_late)
+    return data
+
+def rm_duplicated(data):
+    """
+    按照时间去重
+    :param data:
+    :return:
+    """
+    # 按照时间去重
+    rows_pre = len(data)
+    data = data.groupby(by='C_TIME').mean()
+    data.reset_index(inplace=True)
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("时间去重的行数有:", rows_pre - rows_late)
+    return data

+ 268 - 0
DataBase/db-light/getdata/inputData.py

@@ -0,0 +1,268 @@
+import pandas as pd
+import datetime, time
+import yaml
+import os
+import pymysql
+from sqlalchemy import create_engine
+import pytz
+from getdata.data_cleaning import cleaning, rm_duplicated
+import utils.savedata
+current_path = os.path.dirname(__file__)
+dataloc = current_path + '/data/'
+weatherloc = [1]
+
+
+def readData(name):
+    """
+    读取数据
+    :param name: 名字
+    :return:
+    """
+    path = dataloc + r"/" + name
+    return pd.read_csv(path)
+
+
+def saveData(name, data):
+    """
+    存放数据
+    :param name: 名字
+    :param data: 数据
+    :return:
+    """
+    path = dataloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    data.to_csv(path, index=False)
+
+
+def timestamp_to_datetime(ts):
+    local_timezone = pytz.timezone('Asia/Shanghai')
+    if type(ts) is not int:
+        raise ValueError("timestamp-时间格式必须是整型")
+    if len(str(ts)) == 13:
+        dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    elif len(str(ts)) == 10:
+        dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    else:
+        raise ValueError("timestamp-时间格式错误")
+
+
+def timestr_to_timestamp(time_str):
+    """
+    将时间戳或时间字符串转换为datetime.datetime类型
+    :param time_data: int or str
+    :return:datetime.datetime
+    """
+    if isinstance(time_str, str):
+        if len(time_str) == 10:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
+            return int(round(time.mktime(dt.timetuple())) * 1000)
+        elif len(time_str) in {17, 18, 19}:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')   # strptime字符串解析必须严格按照字符串中的格式
+            return int(round(time.mktime(dt.timetuple())) * 1000)   # 转换成毫秒级的时间戳
+        else:
+            raise ValueError("时间字符串长度不满足要求!")
+    else:
+        return time_str
+
+
+class DataBase(object):
+    def __init__(self, begin, end, database):
+        self.begin = begin
+        self.end = end - pd.Timedelta(minutes=15)
+        self.begin_stamp = timestr_to_timestamp(str(begin))
+        self.end_stamp = timestr_to_timestamp(str(self.end))
+        self.database = database
+
+    def clear_data(self):
+        """
+        删除所有csv
+        :return:
+        """
+        # 设置文件夹路径
+        import glob
+        import os
+        folder_path = dataloc
+
+        # 使用 glob 获取所有的 .csv 文件路径
+        csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
+
+        # 遍历所有 .csv 文件并删除
+        for file_path in csv_files:
+            os.remove(file_path)
+        # self.logger.info("清除所有csv文件")
+
+    def create_database(self):
+        """
+        创建数据库连接
+        :param database: 数据库地址
+        :return:
+        """
+        engine = create_engine(self.database)
+        return engine
+
+    def exec_sql(self, sql, engine):
+        """
+        从数据库获取数据
+        :param sql: sql语句
+        :param engine: 数据库对象
+        :return:
+        """
+        df = pd.read_sql_query(sql, engine)
+        return df
+
+    def split_time(self, data):
+        data['C_TIME'] = pd.to_datetime(data["C_TIME"])
+        data.set_index('C_TIME', inplace=True)
+        data = data.sort_index().loc[self.begin: self.end]
+        data.reset_index(drop=False, inplace=True)
+        return data
+
+    def get_process_NWP(self):
+        """
+        从数据库中获取NWP数据,并进行简单处理
+        :param database:
+        :return:
+        """
+        # NPW数据
+        engine = self.create_database()
+        sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
+                  "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION,  " \
+                  "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
+                  "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \
+                  " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)   # 光的NWP字段
+        NWP = self.exec_sql(sql_NWP, engine)
+
+        NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
+
+        NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
+        NWP = cleaning(NWP, 'NWP')
+        # NWP = self.split_time(NWP)
+        NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        utils.savedata.saveData("NWP.csv", NWP)
+        print("导出nwp数据")
+        return NWP
+
+    def get_process_weather(self):
+        """
+        获取环境检测仪数据
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        print("现有环境监测仪:{}".format(weatherloc))
+        for i in weatherloc:
+            # 删除没用的列
+            drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"]
+
+            get_colmns = []
+            # 查询表的所有列名
+            result_set = self.exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine)
+            for name in result_set.iloc[:,0]:
+                if name not in drop_colmns:
+                    get_colmns.append(name)
+
+            all_columns_str = ", ".join([f'{col}' for col in get_colmns])
+
+            weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+ str(i) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)
+            weather = self.exec_sql(weather_sql, engine)
+            weather['C_TIME'] = pd.to_datetime(weather['C_TIME'])
+            # weather = self.split_time(weather)
+            utils.savedata.saveData("/weather-{}.csv".format(i), weather)
+            print("环境监测仪{}导出数据".format(i))
+
+    def get_process_power(self):
+        """
+        获取整体功率数据
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_cap = "select C_CAPACITY from t_electric_field"
+        cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
+        sql_power = "select C_TIME, C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
+                    " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
+        # if self.opt.usable_power["clean_power_by_signal"]:
+        #     sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and  C_IS_RATIONING_BY_AUTO_CONTROL=0"
+        powers = self.exec_sql(sql_power, engine)
+        mask1 = powers.loc[:, 'C_REAL_VALUE'].astype(float) > float(cap)
+        mask = powers['C_REAL_VALUE'] == -99
+
+        mask = mask | mask1
+        print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
+        powers = powers[~mask]
+        print("剔除完后还剩{}条".format(len(powers)))
+        powers.reset_index(drop=True, inplace=True)
+        binary_map = {b'\x00': 0, b'\x01': 1}
+        powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
+        powers = rm_duplicated(powers)
+        utils.savedata.saveData("power.csv", powers)
+
+    def get_process_dq(self):
+        """
+        获取短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
+                 "where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
+        dq = self.exec_sql(sql_dq, engine)
+        # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
+        dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
+        # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
+        # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
+        dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
+        dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        utils.savedata.saveData("dq.csv", dq)
+        print("导出dq数据")
+
+    def get_process_cdq(self):
+        """
+        获取超短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from " \
+                  "t_forecast_power_ultra_short_term_his" \
+                  " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
+        cdq = self.exec_sql(sql_cdq, engine)
+        cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
+        cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
+        # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
+        cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        utils.savedata.saveData("cdq.csv", cdq)
+
+    def indep_process(self):
+        """
+        进一步数据处理:时间统一处理等
+        :return:
+        """
+        # 环境监测仪数据处理
+        for i in weatherloc:
+            weather = utils.savedata.readData("/weather-{}.csv".format(i))
+            weather = cleaning(weather, 'weather', cols=['C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_RH', 'C_AIRT', 'C_P', 'C_WS', 'C_WD'])
+            utils.savedata.saveData("/weather-{}-process.csv".format(i), weather)
+
+    def data_process(self):
+        """
+        数据导出+初步处理的总操控代码
+        :param database:
+        :return:
+        """
+        # self.clear_data()
+        try:
+            self.get_process_power()
+            self.get_process_dq()
+            # self.get_process_cdq()
+            self.get_process_NWP()
+            self.get_process_weather()
+            self.indep_process()
+        except Exception as e:
+            print("导出数据出错:{}".format(e.args))
+
+
+
+

+ 80 - 0
DataBase/db-light/getdata/splitdata.py

@@ -0,0 +1,80 @@
+import pandas as pd
+import utils.savedata
+
+
+def a():
+    # 读取大文件
+    # large_file = "power_15min.csv"
+    # large_df = utils.savedata.readData(large_file)
+    #
+    # # 读取每个小文件并将其存储在一个字典中
+    # small_files = []
+    # for i in range(6):
+    #     small_files.append("/Dataset_training/NWP/NWP_{}.csv".format(i))
+    # small_dfs = {}
+    #
+    # for file in small_files:
+    #     small_dfs[file] = utils.savedata.readData(file)
+    #
+    # # 根据每个小文件的时间范围拆分大文件
+    # i = 0
+    # for file, df in small_dfs.items():
+    #     min_time = df["C_TIME"].min()
+    #     max_time = df["C_TIME"].max()
+    #     splitted_df = large_df[(large_df["C_TIME"] >= min_time) & (large_df["C_TIME"] <= max_time)]
+    #     utils.savedata.saveData("/Dataset_training/power/power_{}.csv".format(i), splitted_df)
+    #     i = i + 1
+
+    filenames = ["Dataset_training/NWP/NWP_0.csv","Dataset_training/power/power_0.csv"]
+    dataframes = []
+    for name in filenames:
+        dataframes.append(utils.savedata.readData(name))
+
+    # 查找最大起始时间和最小结束时间
+    max_start_time = max(df['C_TIME'].min() for df in dataframes)
+    min_end_time = min(df['C_TIME'].max() for df in dataframes)
+
+    print(max_start_time)
+    print(min_end_time)
+
+    # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
+    for i, df in enumerate(dataframes):
+        df['C_TIME'] = pd.to_datetime(df['C_TIME'])  # 确保时间列是 datetime 类型
+        df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
+
+        # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
+        utils.savedata.saveData(filenames[i],df_filtered)
+
+def split_test():
+    # 读取 NWP_5.csv 和 power_5.csv 文件
+    nwp_df = utils.savedata.readData("Dataset_training/NWP/NWP_0.csv")
+    power_df = utils.savedata.readData("Dataset_training/power/power_0.csv")
+
+    small_files = []
+    for i in range(6):
+        small_files.append("/Dataset_training/NWP/NWP_{}.csv".format(i))
+    for i in range(6):
+        small_files.append("/Dataset_training/power/power_{}.csv".format(i))
+    dataframes = []
+    for name in small_files:
+        dataframes.append(utils.savedata.readData(name))
+    small_dfs = {}
+    l = 0
+    for df in dataframes:
+        l = l + len(df)
+    test_size = int(l* 0.1)
+
+    nwp_test = nwp_df.iloc[-test_size:]
+    power_test = power_df.iloc[-test_size:]
+
+    nwp_train = nwp_df[~nwp_df["C_TIME"].isin(nwp_test["C_TIME"])]
+    power_train = power_df[~power_df["C_TIME"].isin(power_test["C_TIME"])]
+
+    utils.savedata.saveData("/Dataset_test/NWP/NWP_test.csv", nwp_test)
+    utils.savedata.saveData("/Dataset_test/power/power_test.csv", power_test)
+
+    utils.savedata.saveData("/Dataset_training/NWP/NWP_5.csv", nwp_train)
+    utils.savedata.saveData("/Dataset_training/power/power_5.csv", power_train)
+if __name__ == '__main__':
+    a()
+    #split_test()

+ 10 - 0
DataBase/db-light/main.py

@@ -0,0 +1,10 @@
+import pandas as pd
+import os
+import datetime
+from getdata.inputData import DataBase
+from utils import Arg
+
+if __name__ == "__main__":
+    arg = Arg.Arg()
+    db = DataBase(begin=datetime.datetime.strptime(arg.begin, '%Y-%m-%d'), end=datetime.datetime.strptime(arg.end, '%Y-%m-%d'), database=arg.database)
+    db.data_process()

+ 54 - 0
DataBase/db-light/norm.py

@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/6/25 10:28
+# file: norm.py
+# author: David
+# company: shenyang JY
+import os.path
+
+import numpy as np
+import pandas as pd
+import yaml
+
+
+class Normalize():
+    def __init__(self):
+        self.mean = {}
+        self.std = {}
+
+    def normalize(self, df):
+        """
+        暂时不将C_TIME归一化
+        :param dfs:
+        :return:
+        """
+        if 'C_TIME' in df.columns:
+            df.drop('C_TIME', axis=1, inplace=True)
+        mean_dict = np.mean(df, axis=0)  # 数据的均值
+        std_dict = np.std(df, axis=0)  # 标准差
+        for k, v in mean_dict.to_dict().items():
+            self.mean[k] = round(v, 3)
+        for k, v in std_dict.to_dict().items():
+            self.std[k] = round(v, 3)
+        print("归一化参数,均值为:{},方差为:{}".format(self.mean, self.std))
+
+    def save_yml(self, yml_dict, path):
+        cfg = {}
+        for k, v in yml_dict.items():
+            cfg[k] = v
+        with open(path, 'w') as f:
+            yaml.safe_dump(cfg, f, default_flow_style=False)
+
+
+if __name__ == '__main__':
+    norm = Normalize()
+    p = r'F:\ligh-power\data\J00226\rp.xls'
+    en = r'F:\ligh-power\data\J00226\envir.xls'
+    nwp = r'F:\ligh-power\data\J00226\nwp.xls'
+    power = pd.read_excel(p)
+    en = pd.read_excel(en)
+    nwp = pd.read_excel(nwp)
+    norm.normalize(power.loc[:, ['C_VALUE']])
+    norm.normalize(en.loc[:, 'C_RH':'C_P'])
+    norm.normalize(nwp.loc[:, 'C_DIRECTION10':'C_TEMPERATURE90'])
+    norm.save_yml({'mean': norm.mean, 'std': norm.std}, path=r'F:\light-CDQ\data\J00226\norm.yaml')

+ 15 - 0
DataBase/db-light/utils/Arg.py

@@ -0,0 +1,15 @@
+class Arg:
+    def __init__(self):
+        # 数据库地址
+        # 数据存放位置
+        self.dataloc = "../data/526"
+        # 变量存放位置
+        self.varloc = "../data/526/var"
+        # 环境监测仪个数
+        self.weatherloc = [1]
+        # 归一化文件存放位置
+        self.normloc = '../data/526/norm.yaml'
+
+        self.begin = '2023-11-01'
+        self.end = '2024-05-31'
+        self.database = "mysql+pymysql://root:mysql_T7yN3E@192.168.12.10:19306/ipfcst_j00526_20240527101346"

+ 39 - 0
DataBase/db-light/utils/savedata.py

@@ -0,0 +1,39 @@
+import pickle
+import pandas as pd
+from utils import Arg
+import os
+
+arg = Arg.Arg()
+
+def saveData(name,data):
+    """
+    存放数据
+    :param name: 名字
+    :param data: 数据
+    :return:
+    """
+    path = arg.dataloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    data.to_csv(path, index=False)
+
+def readData(name):
+    """
+    读取数据
+    :param name: 名字
+    :return:
+    """
+    path = arg.dataloc + r"/" + name
+    return pd.read_csv(path)
+
+def saveVar(name,data):
+    path = arg.varloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    with open(path, 'wb') as file:
+        pickle.dump(data, file)
+
+def readVar(name):
+    path = arg.varloc + r"/" + name
+    with open(path, "rb") as file:
+        split_NWP = pickle.load(file)
+    return split_NWP
+

+ 16 - 0
DataBase/db-wind/Arg.py

@@ -0,0 +1,16 @@
+class Arg:
+    def __init__(self):
+        # 数据库地址
+        self.database = "mysql+pymysql://root:mysql_T7yN3E@192.168.12.10:19306/ipfcst_j00314_20240516121839"
+        # 数据存放位置
+        self.dataloc = "../data/314"
+        # 变量存放位置
+        self.varloc = "../data/314/var"
+        # 测风塔个数
+        self.towerloc = [1]
+        # 机头编号
+        self.turbineloc = [i for i in range(1, 16)]
+
+        self.begin = '2023-03-01'
+        self.end = '2024-03-31'
+

+ 12 - 0
DataBase/db-wind/README.md

@@ -0,0 +1,12 @@
+# 操作方法
+- 运行main.py就行
+
+主要功能是读取数据库的相关数据保存为本地excel,并进行一些简单的数据处理。
+主要包括的函数有:
+
+- clear_data():清空data的所有.csv文件,前提是需要运行一遍代码创建文件
+- get_process_NWP(database): 从数据库中获取NWP数据,并进行简单处理
+- get_process_turbine(database):从数据库中获取风头数据,并进行简单处理
+- get_process_tower(database):获取测风塔数据
+- get_process_power(database):获取整体功率数据
+- indep_process():进一步数据处理:时间统一处理等

+ 79 - 0
DataBase/db-wind/data_cleaning.py

@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/10/11 11:00
+# file: data_cleaning.py
+# author: David
+# company: shenyang JY
+import numpy as np
+
+
+def cleaning(df, name, cols=None, dup=True):
+    print("开始清洗:{}……".format(name))
+    data = df.copy()
+    if len(data) > 0:
+        data = data_column_cleaning(data)
+    if dup:
+        data = rm_duplicated(data)
+    if cols is not None:
+        data = key_field_row_cleaning(data, cols)
+    else:
+        data = interpolation(data)
+    return data
+
+def data_column_cleaning(data, clean_value=[-9999.0, -99, -99.0]):
+    """
+    列的清洗
+    :param data:
+    :param clean_value:
+    :return:
+    """
+    cols_pre = data.columns.to_list()
+    for val in clean_value:
+        data = data.replace(val, np.nan)
+    # nan 列超过80% 删除
+    data = data.dropna(axis=1, thresh=len(data) * 0.8)
+    # 删除取值全部相同的列
+    data = data.loc[:, (data != data.iloc[0]).any()]
+    cols_late = data.columns.tolist()
+    if len(cols_pre) > len(cols_late):
+        print("清洗的列有:{}".format(set(cols_pre) - set(cols_late)))
+    return data
+
+
+def interpolation(data):
+    # 剩下的nan进行线性插值
+    data = data.bfill()
+    return data
+
+
+def key_field_row_cleaning(data, cols):
+    """
+    行的重要字段清洗: 过滤含有- 99的数字,过滤空值
+    :param data:
+    :param cols: 指定的字段列表
+    :return:
+    """
+    rows_pre = len(data)
+    for col in cols:
+        if col in data.columns.tolist():
+            data = data[~((data.loc[:, col] < 0) & (data.loc[:, col].astype(str).str.contains('99')))]
+            data = data[~data.loc[:, col].isnull()]
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("清洗的行数有:", rows_pre-rows_late)
+    return data
+
+def rm_duplicated(data):
+    """
+    按照时间去重
+    :param data:
+    :return:
+    """
+    # 按照时间去重
+    rows_pre = len(data)
+    data = data.groupby(by='C_TIME').mean()
+    data.reset_index(inplace=True)
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("时间去重的行数有:", rows_pre - rows_late)
+    return data

+ 293 - 0
DataBase/db-wind/inputData.py

@@ -0,0 +1,293 @@
+import pandas as pd
+import datetime, time
+import pytz
+from savedata import saveData, readData
+from Arg import Arg
+from sqlalchemy import create_engine
+import pytz
+from data_cleaning import cleaning, rm_duplicated
+
+# def readData(name):
+#     """
+#     读取数据
+#     :param name: 名字
+#     :return:
+#     """
+#     path = dataloc + r"/" + name
+#     return pd.read_csv(path)
+#
+#
+# def saveData(name, data):
+#     """
+#     存放数据
+#     :param name: 名字
+#     :param data: 数据
+#     :return:
+#     """
+#     path = dataloc + r"/" + name
+#     os.makedirs(os.path.dirname(path), exist_ok=True)
+#     data.to_csv(path, index=False)
+
+
+def timestamp_to_datetime(ts):
+    local_timezone = pytz.timezone('Asia/Shanghai')
+    if type(ts) is not int:
+        raise ValueError("timestamp-时间格式必须是整型")
+    if len(str(ts)) == 13:
+        dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    elif len(str(ts)) == 10:
+        dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    else:
+        raise ValueError("timestamp-时间格式错误")
+
+
+def timestr_to_timestamp(time_str):
+    """
+    将时间戳或时间字符串转换为datetime.datetime类型
+    :param time_data: int or str
+    :return:datetime.datetime
+    """
+    if isinstance(time_str, str):
+        if len(time_str) == 10:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
+            return int(round(time.mktime(dt.timetuple())) * 1000)
+        elif len(time_str) in {17, 18, 19}:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')   # strptime字符串解析必须严格按照字符串中的格式
+            return int(round(time.mktime(dt.timetuple())) * 1000)   # 转换成毫秒级的时间戳
+        else:
+            raise ValueError("时间字符串长度不满足要求!")
+    else:
+        return time_str
+
+
+class DataBase(object):
+    def __init__(self, arg):
+        self.begin = datetime.datetime.strptime(arg.begin, '%Y-%m-%d')
+        self.end = datetime.datetime.strptime(arg.end, '%Y-%m-%d') - pd.Timedelta(minutes=15)
+        self.begin_stamp = timestr_to_timestamp(str(arg.begin))
+        self.end_stamp = timestr_to_timestamp(str(self.end))
+        self.database = arg.database
+        self.towerloc = arg.towerloc
+        self.turbineloc = arg.turbineloc
+        self.dataloc = arg.dataloc
+
+    def clear_data(self):
+        """
+        删除所有csv
+        :return:
+        """
+        # 设置文件夹路径
+        import glob
+        import os
+        folder_path = self.dataloc
+
+        # 使用 glob 获取所有的 .csv 文件路径
+        csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
+
+        # 遍历所有 .csv 文件并删除
+        for file_path in csv_files:
+            os.remove(file_path)
+        print("清除所有csv文件")
+
+    def create_database(self):
+        """
+        创建数据库连接
+        :param database: 数据库地址
+        :return:
+        """
+        engine = create_engine(self.database)
+        return engine
+
+    def exec_sql(self, sql, engine):
+        """
+        从数据库获取数据
+        :param sql: sql语句
+        :param engine: 数据库对象
+        :return:
+        """
+        df = pd.read_sql_query(sql, engine)
+        return df
+
+    def split_time(self, data):
+        data.set_index('C_TIME', inplace=True)
+        data = data.sort_index().loc[self.begin: self.end]
+        data.reset_index(drop=False, inplace=True)
+        return data
+
+    def get_process_NWP(self):
+        """
+        从数据库中获取NWP数据,并进行简单处理
+        :param database:
+        :return:
+        """
+        # NPW数据
+        engine = self.create_database()
+        sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
+                  "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION,  " \
+                  "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
+                  "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" \
+                  " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 风的NWP字段
+        NWP = self.exec_sql(sql_NWP, engine)
+
+        NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
+
+        NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
+        NWP = cleaning(NWP, 'NWP')
+        # NWP = self.split_time(NWP)
+        NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("NWP.csv", NWP)
+        return NWP
+
+    def get_process_tower(self):
+        """
+        获取环境检测仪数据
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        print("提取测风塔:{}".format(self.towerloc))
+        for i in self.towerloc:
+            # 删除没用的列
+            drop_colmns = ["C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
+
+            get_colmns = []
+            # 查询表的所有列名
+            result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
+            for name in result_set.iloc[:, 0]:
+                if name not in drop_colmns:
+                    get_colmns.append(name)
+
+            all_columns_str = ", ".join([f'{col}' for col in get_colmns])
+            tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)
+            tower = self.exec_sql(tower_sql, engine)
+            tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
+            saveData("/tower-{}.csv".format(i), tower)
+            print("测风塔{}导出数据".format(i))
+
+
+    def get_process_power(self):
+        """
+        获取整体功率数据
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_cap = "select C_CAPACITY from t_electric_field"
+        cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
+        sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
+                    " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
+        # if self.opt.usable_power["clean_power_by_signal"]:
+        #     sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and  C_IS_RATIONING_BY_AUTO_CONTROL=0"
+        powers = self.exec_sql(sql_power, engine)
+        powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
+        mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
+        mask = powers['C_REAL_VALUE'] == -99
+
+        mask = mask | mask1
+        print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
+        powers = powers[~mask]
+        print("剔除完后还剩{}条".format(len(powers)))
+        binary_map = {b'\x00': 0, b'\x01': 1}
+        powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
+        powers = rm_duplicated(powers)
+        saveData("power.csv", powers)
+
+    def get_process_dq(self):
+        """
+        获取短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
+                 "where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
+        dq = self.exec_sql(sql_dq, engine)
+        # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
+        dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
+        # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
+        # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
+        dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
+        dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("dq.csv", dq)
+
+    def indep_process(self):
+        """
+        进一步数据处理:时间统一处理等
+        :return:
+        """
+        # 测风塔数据处理
+        for i in self.towerloc:
+            tower = readData("/tower-{}.csv".format(i))
+            tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
+            # 判断每一列是否全是 -99
+            all_minus_99 = (tower == -99).all()
+
+            # 获取全是 -99 的列的列名
+            cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist()
+
+            # 使用 drop() 方法删除列
+            tower = tower.drop(cols_to_drop, axis=1)
+            tower = cleaning(tower, 'tower', ['C_WS_INST_HUB_HEIGHT'])
+            saveData("/tower-{}-process.csv".format(i), tower)
+
+    def get_process_cdq(self):
+        """
+        获取超短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \
+                  " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
+        cdq = self.exec_sql(sql_cdq, engine)
+        cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
+        cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
+        # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
+        cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("cdq.csv", cdq)
+
+    def get_process_turbine(self):
+        """
+        从数据库中获取风头数据,并进行简单处理
+        :param database:
+        :return:
+        """
+        for number in self.turbineloc:
+            # number = self.opt.usable_power['turbine_id']
+            # 机头数据
+            engine = self.create_database()
+
+            print("导出风机{}的数据".format(number))
+            sql_turbine = "select C_TIME, C_WS, C_WD, C_ACTIVE_POWER from t_wind_turbine_status_data " \
+                          "WHERE C_EQUIPMENT_NO=" + str(number) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)    # + " and C_WS>0 and C_ACTIVE_POWER>0"
+            turbine = self.exec_sql(sql_turbine, engine)
+            turbine = cleaning(turbine, 'cdq', cols=['C_WS', 'C_ACTIVE_POWER'], dup=False)
+            turbine['C_TIME'] = pd.to_datetime(turbine['C_TIME'])
+            turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
+            # 直接导出所有数据
+            saveData("turbine-15/turbine-{}.csv".format(number), turbine)
+
+
+
+    def data_process(self):
+        """
+        数据导出+初步处理的总操控代码
+        :param database:
+        :return:
+        """
+        self.clear_data()
+        try:
+            # self.get_process_power()
+            # self.get_process_dq()
+            # self.get_process_cdq()
+            # self.get_process_NWP()
+            # self.get_process_tower()
+            self.get_process_turbine()
+            self.indep_process()
+        except Exception as e:
+            print("导出数据出错:{}".format(e.args))
+
+
+

+ 225 - 0
DataBase/db-wind/main.py

@@ -0,0 +1,225 @@
+import pandas as pd
+import os
+from datetime import timedelta
+from inputData import DataBase
+import Arg
+#——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
+def mark_abnormal_streaks(df, columns, min_streak):
+    abnormal_mask = pd.Series(False, index=df.index)
+    streak_start = None
+    for i in range(len(df)):
+        if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
+            streak_start = i
+
+        if i - streak_start >= min_streak - 1:
+            abnormal_mask[i - min_streak + 1:i + 1] = True
+
+    return abnormal_mask
+
+
+def remove_abnormal_values(df, N):
+    # 标记C_ACTIVE_POWER为-99的行为异常值
+    abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
+    count_abnormal1 = abnormal_mask1.sum()
+
+    # 标记C_WS, A, B连续5行不变的行为异常值
+    columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
+    abnormal_mask2 = mark_abnormal_streaks(df, columns, N)
+    count_abnormal2 = abnormal_mask2.sum()
+    # 获得所有异常值的布尔掩码
+    abnormal_mask = abnormal_mask1 | abnormal_mask2
+    # 获取连续异常值具体数值
+    removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
+    # 剔除异常值
+    df_clean = df[~abnormal_mask]
+    total_removed = abnormal_mask.sum()
+    return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
+
+
+def process_csv_files(input_dir, output_dir,M,N):  # MBD:没有考虑时间重复
+    if not os.path.exists(output_dir):
+        os.makedirs(output_dir)
+
+    for i in arg.turbineloc:
+        input_file = os.path.join(input_dir, f"turbine-{i}.csv")
+        output_file = os.path.join(output_dir, f"turbine-{i}.csv")
+
+        # 读取csv文件
+        df = pd.read_csv(input_file)
+
+        # 剔除异常值,并获取异常值统计信息
+        df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = remove_abnormal_values(df,N)
+
+        # 输出异常值统计信息
+        print(f"处理文件:{input_file}")
+        print(f"剔除 -99 点异常值数量:{count_abnormal1}")
+        print(f"剔除连续异常值数量:{count_abnormal2}")
+        print(f"总共剔除数据量:{total_removed}")
+        print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
+
+        # 保存处理过的CSV文件
+        df_clean.to_csv(output_file, index=False)
+#——————————————————————————风机单机时间对齐——————————————————————————————
+def TimeMerge(input_dir, output_dir,M):
+    # 读取所有CSV文件
+    files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in arg.turbineloc]
+    dataframes = [pd.read_csv(f) for f in files]
+
+    # 获取C_TIME列的交集
+    c_time_intersection = set(dataframes[0]["C_TIME"])
+    for df in dataframes[1:]:
+        c_time_intersection.intersection_update(df["C_TIME"])
+
+    # 只保留C_TIME交集中的数据
+    filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
+
+    # 将每个过滤后的DataFrame写入新的CSV文件
+    os.makedirs(output_dir, exist_ok=True)
+    for (filtered_df, i) in zip(filtered_dataframes, arg.turbineloc):
+        if i == 144:
+            filtered_df['C_ACTIVE_POWER'] /= 1000
+        filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
+
+#——————————————————————————风机缺失点处理——————————————————————————————
+def MissingPointProcessing(input_dir,output_dir,M,N):
+
+    # 存储数据的列表
+
+    # 读取M个文件
+    for k in arg.turbineloc:
+        file_name = input_dir + '/' + f"turbine-{k}.csv"
+        # file_name = os.path.join(input_dir, f"turbine-{k}.csv")
+    # 读取CSV文件
+        data = pd.read_csv(file_name, parse_dates=['C_TIME'])
+
+    # 计算时间差
+        data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
+
+    # 找出缺失的时间点
+        missing_data_points = data[data['time_diff'] > 900]
+
+    # 存储填充的时间和值
+        filled_data = []
+
+    # 输出缺失的开始时刻和数量
+        print("缺失的开始时刻:")
+        for index, row in missing_data_points.iterrows():
+            missing_start = row['C_TIME'] - timedelta(seconds=row['time_diff'])
+            missing_count = int(row['time_diff'] // 900) - 1
+
+            # 如果缺失的点数小于N个,则进行填充 MBD:填充代码比较啰嗦
+            if missing_count <= N:
+                prev_values = data.iloc[index - 1][['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
+                next_values = row[['C_WS', 'C_WD', 'C_ACTIVE_POWER']]
+
+                for i in range(1, missing_count + 1):
+                    t = i / (missing_count + 1)
+                    filled_time = missing_start + timedelta(minutes=15 * i)
+
+                    filled_values = {
+                        'C_TIME': filled_time,
+                        'C_WS': prev_values['C_WS'] + (next_values['C_WS'] - prev_values['C_WS']) * t,
+                        'C_WD': prev_values['C_WD']+(next_values['C_WD']-prev_values['C_WD'])*t,
+                        'C_ACTIVE_POWER': prev_values['C_ACTIVE_POWER'] + (
+                                    next_values['C_ACTIVE_POWER'] - prev_values['C_ACTIVE_POWER']) * t,
+                    }
+
+                    # 将角度值限制在-180到180的范围内
+                    filled_values['C_WD'] = (filled_values['C_WD'] + 180) % 360 - 180
+
+                    filled_data.append(filled_values)
+                    print(f"填充的时间: {filled_time}, 填充的值: {filled_values}")
+
+            print(f"{missing_start} - 缺失的点的数量: {missing_count}")
+
+        # 将填充的数据插入原始数据中
+        filled_df = pd.DataFrame(filled_data)
+        data = pd.concat([data, filled_df], ignore_index=True)
+        # 对数据按时间排序并重置索引
+        data = data.sort_values(by='C_TIME').reset_index(drop=True)
+
+        # 输出总缺失点数
+        missing_data_points = data[data['time_diff'] > 900]
+        print(f"总缺失点数: {int(missing_data_points['time_diff'].sum() // 900) - len(missing_data_points)}")
+        data.drop(columns=['time_diff'], inplace=True)
+        os.makedirs(output_dir, exist_ok=True)
+        output_path_name = os.path.join(output_dir, f"turbine-{k}.csv")
+        print(output_path_name)
+    # 保存插值后的文件
+        data.to_csv(output_path_name, index=False)
+#——————————————————————————风机单机连续时间段分割——————————————————————————————
+def top_n_continuous_periods(data, n):
+    continuous_periods = []
+    continuous_start = data['C_TIME'].iloc[0]
+    continuous_count = 1
+
+    for i in range(1, len(data)):
+        if data['time_diff'].iloc[i] == 900:
+            continuous_count += 1
+        else:
+            continuous_periods.append({
+                'start': continuous_start,
+                'end': data['C_TIME'].iloc[i - 1],
+                'count': continuous_count
+            })
+            continuous_start = data['C_TIME'].iloc[i]
+            continuous_count = 1
+
+    continuous_periods.append({
+        'start': continuous_start,
+        'end': data['C_TIME'].iloc[-1],
+        'count': continuous_count
+    })
+    continuous_periods.sort(key=lambda x: x['count'], reverse=True)
+    return continuous_periods[:n]
+def Continuous_Data(input_dir,output_dir,M,TopN):
+    # 读取CSV文件
+    for k in arg.turbineloc:
+        path_dir = f"turbine-{k}.csv"
+        input_path = os.path.join(input_dir, path_dir)
+        data = pd.read_csv(input_path, parse_dates=['C_TIME'])
+        data = data.sort_values(by='C_TIME').reset_index(drop=True)
+    # 计算时间差
+        data['time_diff'] = data['C_TIME'].diff().dt.total_seconds()
+    # 获取Top N连续的数据段
+        top_n = TopN
+        top_n_periods = top_n_continuous_periods(data, top_n)
+        data.drop(columns=['time_diff'], inplace=True)
+    # 输出Top N连续的数据的数量、开始时间和最后的时间
+        print(f"Top {top_n} 连续的数据段:")
+        for i, period in enumerate(top_n_periods):
+            print(f"{i + 1}. 开始时间: {period['start']} - 结束时间: {period['end']} - 数据量: {period['count']}")
+            output_file = f"turbine-{k}_{period['count']}.csv"
+            mask = (data['C_TIME'] >= period['start']) & (data['C_TIME'] <= period['end'])
+            filtered_df = data.loc[mask]
+            # 更新文件名中的period['count']为数据集大小
+            output_file = output_file.replace(str(period['count']), str(filtered_df.shape[0]))
+            output_folder = f"Continuous_Turbine_Data_{period['count']}_{period['start'].strftime('%y-%m-%d-%H-%M')}_{period['end'].strftime('%y-%m-%d-%H-%M')}"
+            output_folder = os.path.join(output_dir, output_folder)
+            if not os.path.exists(output_folder):
+                os.makedirs(output_folder)
+            # 保存截取的数据到新的csv文件
+            # filtered_df.to_csv(output_dir, index=False)
+            filtered_df.to_csv(os.path.join(output_folder, output_file), index=False)
+            print(f"Processed {input_path}")
+
+
+if __name__ == "__main__":
+    import datetime
+    arg = Arg.Arg()
+    db = DataBase(arg=arg)
+    db.data_process()
+    input_dir = "../data/314/turbine-15"  # 输入文件夹路径
+    output_dir = "../data/314/output_clean_csv_files"  # 输出文件夹路径
+    # 对机头风速连续异常值和-99进行清洗,第三个参数是连续5个值不变以后就认为异常
+    # 这步会生成一个"output_clean_csv_files"文件夹,里面包含全部单机的数据,存储的机头风速只清理了-99,参数50是风机数量+1,风机参数5就是连续5个点的认为是异常值,全部剔除。
+    process_csv_files(input_dir, output_dir, 50, 5)
+    output_dir_time_Merge = "../data/314/output_filtered_csv_files"
+    # 这步会生成一个"output_filtered_csv_files"文件夹,在上一步的基础上,对齐了全部风机的时间,只各自保留了交集。
+    TimeMerge(output_dir,output_dir_time_Merge,50)
+    # output_complete_data = "../data_mts/complete_data"
+    # 这步会生成一个"complete_data"文件夹,在上一步的基础上,填充了10个时间点之内的缺失。
+    # MissingPointProcessing(output_dir_time_Merge,output_complete_data,50,10)
+    # continuous_time = "../data_mts/continuous_data"
+    # # 这步会生成一个"Continuous_data"文件夹,在上一步的基础上,取Top10个连续时间段最长的单机数据。
+    # Continuous_Data(output_complete_data, continuous_time, 50, 10)

+ 39 - 0
DataBase/db-wind/savedata.py

@@ -0,0 +1,39 @@
+import pickle
+import pandas as pd
+import Arg
+import os
+
+arg = Arg.Arg()
+
+def saveData(name,data):
+    """
+    存放数据
+    :param name: 名字
+    :param data: 数据
+    :return:
+    """
+    path = arg.dataloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    data.to_csv(path, index=False)
+
+def readData(name):
+    """
+    读取数据
+    :param name: 名字
+    :return:
+    """
+    path = arg.dataloc + r"/" + name
+    return pd.read_csv(path)
+
+def saveVar(name,data):
+    path = arg.varloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    with open(path, 'wb') as file:
+        pickle.dump(data, file)
+
+def readVar(name):
+    path = arg.varloc + r"/" + name
+    with open(path, "rb") as file:
+        split_NWP = pickle.load(file)
+    return split_NWP
+

+ 80 - 0
DataBase/db-wind/splitdata.py

@@ -0,0 +1,80 @@
+import pandas as pd
+import utils.savedata
+
+
+def a():
+    # 读取大文件
+    # large_file = "power_15min.csv"
+    # large_df = utils.savedata.readData(large_file)
+    #
+    # # 读取每个小文件并将其存储在一个字典中
+    # small_files = []
+    # for i in range(6):
+    #     small_files.append("/Dataset_training/NWP/NWP_{}.csv".format(i))
+    # small_dfs = {}
+    #
+    # for file in small_files:
+    #     small_dfs[file] = utils.savedata.readData(file)
+    #
+    # # 根据每个小文件的时间范围拆分大文件
+    # i = 0
+    # for file, df in small_dfs.items():
+    #     min_time = df["C_TIME"].min()
+    #     max_time = df["C_TIME"].max()
+    #     splitted_df = large_df[(large_df["C_TIME"] >= min_time) & (large_df["C_TIME"] <= max_time)]
+    #     utils.savedata.saveData("/Dataset_training/power/power_{}.csv".format(i), splitted_df)
+    #     i = i + 1
+
+    filenames = ["Dataset_training/NWP/NWP_0.csv","Dataset_training/power/power_0.csv"]
+    dataframes = []
+    for name in filenames:
+        dataframes.append(utils.savedata.readData(name))
+
+    # 查找最大起始时间和最小结束时间
+    max_start_time = max(df['C_TIME'].min() for df in dataframes)
+    min_end_time = min(df['C_TIME'].max() for df in dataframes)
+
+    print(max_start_time)
+    print(min_end_time)
+
+    # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
+    for i, df in enumerate(dataframes):
+        df['C_TIME'] = pd.to_datetime(df['C_TIME'])  # 确保时间列是 datetime 类型
+        df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
+
+        # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
+        utils.savedata.saveData(filenames[i],df_filtered)
+
+def split_test():
+    # 读取 NWP_5.csv 和 power_5.csv 文件
+    nwp_df = utils.savedata.readData("Dataset_training/NWP/NWP_0.csv")
+    power_df = utils.savedata.readData("Dataset_training/power/power_0.csv")
+
+    small_files = []
+    for i in range(6):
+        small_files.append("/Dataset_training/NWP/NWP_{}.csv".format(i))
+    for i in range(6):
+        small_files.append("/Dataset_training/power/power_{}.csv".format(i))
+    dataframes = []
+    for name in small_files:
+        dataframes.append(utils.savedata.readData(name))
+    small_dfs = {}
+    l = 0
+    for df in dataframes:
+        l = l + len(df)
+    test_size = int(l* 0.1)
+
+    nwp_test = nwp_df.iloc[-test_size:]
+    power_test = power_df.iloc[-test_size:]
+
+    nwp_train = nwp_df[~nwp_df["C_TIME"].isin(nwp_test["C_TIME"])]
+    power_train = power_df[~power_df["C_TIME"].isin(power_test["C_TIME"])]
+
+    utils.savedata.saveData("/Dataset_test/NWP/NWP_test.csv", nwp_test)
+    utils.savedata.saveData("/Dataset_test/power/power_test.csv", power_test)
+
+    utils.savedata.saveData("/Dataset_training/NWP/NWP_5.csv", nwp_train)
+    utils.savedata.saveData("/Dataset_training/power/power_5.csv", power_train)
+if __name__ == '__main__':
+    a()
+    #split_test()

+ 95 - 0
DataFill/data_fill.py

@@ -0,0 +1,95 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2024/5/6 13:52
+# file: data_process.py
+# author: David
+# company: shenyang JY
+import os
+import numpy as np
+import pandas as pd
+from data_cleaning import rm_duplicated
+np.random.seed(42)
+
+
+class DataProcess(object):
+    def __init__(self, opt):
+        self.opt = opt
+
+    def get_train_data(self, unite):
+        # 第一步:计算间隔
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_train = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0)/(15*60) - len(miss_points)
+        print("再次测算,需要插值的总点数为:{}".format(miss_number))
+        # 第二步:插值
+        if miss_number > 0 and self.opt.Model["train_data_fill"]:
+            data_train = self.data_fill(data_train)
+        return data_train
+
+    def get_test_data(self, unite):
+        # 第一步:计算间隔
+        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
+        unite['time_diff'] = unite['C_TIME'].diff()
+        dt_short = pd.Timedelta(minutes=15)
+        dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
+        data_test = self.missing_time_splite(unite, dt_short, dt_long)
+        miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
+        miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
+        print("再次测算,需要插值的总点数为:{}".format(miss_number))
+        # 第二步:插值
+        if self.opt.Model["predict_data_fill"] and miss_number > 0:
+            data_test = self.data_fill(data_test, test=True)
+        return data_test
+
+    def missing_time_splite(self, df, dt_short, dt_long):
+        """
+        分割方法
+        dt_short: 数据时间频率(15min一个点)
+        dt_long: 小于dt_long时长进行补值,大于dt_long时长进行分割
+        """
+        n_long, n_short, n_points = 0, 0, 0
+        start_index = 0
+        dfs = []
+        for i in range(1, len(df)):
+            if df['time_diff'][i] >= dt_long:
+                df_long = df.iloc[start_index:i, :-1]
+                dfs.append(df_long)
+                start_index = i
+                n_long += 1
+            if df['time_diff'][i] > dt_short:
+                self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}")
+                points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
+                self.logger.info("缺失点数:{}".format(points))
+                if df['time_diff'][i] < dt_long:
+                    n_short += 1
+                    n_points += points
+                    self.logger.info("需要补值的点数:{}".format(points))
+        dfs.append(df.iloc[start_index:, :-1])
+        print(f"数据总数:{len(df)}, 时序缺失的间隔:{n_short}, 其中,较长的时间间隔:{n_long}")
+        print("需要补值的总点数:{}".format(n_points))
+        return dfs
+
+    def data_fill(self, dfs, test=False):
+        """
+        补值方法
+        dfs:待补值的dataframe集合
+        test:训练集/测试集标识
+        """
+        dfs_fill, inserts = [], 0
+        for i, df in enumerate(dfs):
+            df = rm_duplicated(df)
+            df1 = df.set_index('C_TIME', inplace=False)
+            # 补值方法可选
+            dff = df1.resample('15T').bfill()
+            dff.reset_index(inplace=True)
+            points = len(dff) - len(df1)
+            dfs_fill.append(dff)
+            self.logger.info("{} ~ {} 有 {} 个点, 填补 {} 个点.".format(dff.iloc[0, 0], dff.iloc[-1, 0], len(dff), points))
+            inserts += points
+        name = "预测数据" if test is True else "训练集"
+        print("{}分成了{}段".format(name, len(dfs_fill)))
+        return dfs_fill

+ 2 - 0
README.md

@@ -0,0 +1,2 @@
+# 新能源功率预测-数据分析
+### —— 刘大为 和 杨龙维护

+ 16 - 0
main.py

@@ -0,0 +1,16 @@
+# 这是一个示例 Python 脚本。
+
+# 按 Shift+F10 执行或将其替换为您的代码。
+# 按 双击 Shift 在所有地方搜索类、文件、工具窗口、操作和设置。
+
+
+def print_hi(name):
+    # 在下面的代码行中使用断点来调试脚本。
+    print(f'Hi, {name}')  # 按 Ctrl+F8 切换断点。
+
+
+# 按装订区域中的绿色按钮以运行脚本。
+if __name__ == '__main__':
+    print_hi('PyCharm')
+
+# 访问 https://www.jetbrains.com/help/pycharm/ 获取 PyCharm 帮助