liudawei 1 năm trước cách đây
mục cha
commit
690bb5b465
8 tập tin đã thay đổi với 400 bổ sung415 xóa
  1. 2 1
      .gitignore
  2. 8 4
      db-wind/Arg.py
  3. 79 0
      db-wind/data_cleaning.py
  4. 0 393
      db-wind/getdata/inputData.py
  5. 291 0
      db-wind/inputData.py
  6. 19 16
      db-wind/main.py
  7. 1 1
      db-wind/savedata.py
  8. 0 0
      db-wind/splitdata.py

+ 2 - 1
.gitignore

@@ -25,4 +25,5 @@
 db-wind/getdata/__pycache__/
 db-light/getdata/__pycache__/
 db-light/getdata/data/
-db-light/utils/__pycache__/
+db-light/utils/__pycache__/
+/db-wind/data/

+ 8 - 4
db-wind/utils/Arg.py → db-wind/Arg.py

@@ -1,12 +1,16 @@
 class Arg:
     def __init__(self):
         # 数据库地址
-        self.database = "mysql+pymysql://root:mysql_T7yN3E@192.168.12.10:19306/ipfcst_j00577_20240604171557"
+        self.database = "mysql+pymysql://root:mysql_T7yN3E@192.168.12.10:19306/ipfcst_j00314_20240516121839"
         # 数据存放位置
-        self.dataloc = "../577/"
+        self.dataloc = "../data/314/"
         # 变量存放位置
-        self.varloc = "../577/var/"
+        self.varloc = "../data/314/var/"
         # 测风塔个数
         self.towerloc = [1]
         # 机头编号
-        self.turbineloc = [i for i in range(1, 16)]
+        self.turbineloc = [i for i in range(1, 16)]
+
+        self.begin = '2023-01-01'
+        self.end = '2024-05-31'
+

+ 79 - 0
db-wind/data_cleaning.py

@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/10/11 11:00
+# file: data_cleaning.py
+# author: David
+# company: shenyang JY
+import numpy as np
+
+
+def cleaning(df, name, cols=None, dup=True):
+    print("开始清洗:{}……".format(name))
+    data = df.copy()
+    if len(data) > 0:
+        data = data_column_cleaning(data)
+    if dup:
+        data = rm_duplicated(data)
+    if cols is not None:
+        data = key_field_row_cleaning(data, cols)
+    else:
+        data = interpolation(data)
+    return data
+
+def data_column_cleaning(data, clean_value=[-9999.0, -99, -99.0]):
+    """
+    列的清洗
+    :param data:
+    :param clean_value:
+    :return:
+    """
+    cols_pre = data.columns.to_list()
+    for val in clean_value:
+        data = data.replace(val, np.nan)
+    # nan 列超过80% 删除
+    data = data.dropna(axis=1, thresh=len(data) * 0.8)
+    # 删除取值全部相同的列
+    data = data.loc[:, (data != data.iloc[0]).any()]
+    cols_late = data.columns.tolist()
+    if len(cols_pre) > len(cols_late):
+        print("清洗的列有:{}".format(set(cols_pre) - set(cols_late)))
+    return data
+
+
+def interpolation(data):
+    # 剩下的nan进行线性插值
+    data = data.bfill()
+    return data
+
+
+def key_field_row_cleaning(data, cols):
+    """
+    行的重要字段清洗: 过滤含有- 99的数字,过滤空值
+    :param data:
+    :param cols: 指定的字段列表
+    :return:
+    """
+    rows_pre = len(data)
+    for col in cols:
+        if col in data.columns.tolist():
+            data = data[~((data.loc[:, col] < 0) & (data.loc[:, col].astype(str).str.contains('99')))]
+            data = data[~data.loc[:, col].isnull()]
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("清洗的行数有:", rows_pre-rows_late)
+    return data
+
+def rm_duplicated(data):
+    """
+    按照时间去重
+    :param data:
+    :return:
+    """
+    # 按照时间去重
+    rows_pre = len(data)
+    data = data.groupby(by='C_TIME').mean()
+    data.reset_index(inplace=True)
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        print("时间去重的行数有:", rows_pre - rows_late)
+    return data

+ 0 - 393
db-wind/getdata/inputData.py

@@ -1,393 +0,0 @@
-import pymysql
-import pandas as pd
-import numpy as np
-from sqlalchemy import create_engine
-import matplotlib.pyplot as plt
-import pytz
-plt.rcParams['font.sans-serif'] = ['SimHei']
-import utils.savedata
-from utils import Arg
-arg = Arg.Arg()
-
-def clear_data():
-    """
-    删除所有csv
-    :return:
-    """
-    # 设置文件夹路径
-    import glob
-    import os
-    folder_path = arg.dataloc
-
-    # 使用 glob 获取所有的 .csv 文件路径
-    csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
-
-    # 遍历所有 .csv 文件并删除
-    for file_path in csv_files:
-        os.remove(file_path)
-    print("清除所有scv文件")
-
-def create_database(database):
-    """
-    创建数据库连接
-    :param database: 数据库地址
-    :return:
-    """
-    engine = create_engine(database)
-    return engine
-
-def exec_sql(sql,engine):
-    """
-    从数据库获取数据
-    :param sql: sql语句
-    :param engine: 数据库对象
-    :return:
-    """
-    df = pd.read_sql_query(sql, engine)
-    return df
-
-def get_process_NWP(database):
-    """
-    从数据库中获取NWP数据,并进行简单处理
-    :param database:
-    :return:
-    """
-    # NPW数据
-    engine = create_database(database)
-    sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp"
-    NWP = exec_sql(sql_NWP, engine)
-
-    #删除后三位
-    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(str)
-    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].str[:-3]
-
-    # 将 'timestamp' 列转换为日期时间格式
-    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(float)
-    NWP['C_PRE_TIME'] = pd.to_datetime(NWP['C_PRE_TIME'], unit='s')
-
-    # 将日期时间转换为本地时区
-    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.tz_localize(pytz.utc).dt.tz_convert('Asia/Shanghai')
-
-    # 格式化日期时间为年月日时分秒
-    NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
-
-    NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
-
-    utils.savedata.saveData("NWP.csv",NWP)
-    return NWP
-
-def get_process_turbine(database):
-    """
-    从数据库中获取风头数据,并进行简单处理
-    :param database:
-    :return:
-    """
-
-    # 获取NWP数据
-    NWP = utils.savedata.readData("NWP.csv")
-    NWP_date = NWP.iloc[:,0]
-    print(NWP_date)
-
-    # 机头数据
-    engine = create_database(database)
-    for i in arg.turbineloc:
-        print("导出风机{}的数据".format(i))
-        sql_turbine = "select C_TIME,C_DATA1 as C_WS, C_DATA2 as C_WD, C_DATA3 as C_ACTIVE_POWER from t_wind_turbine_status_data WHERE C_EQUIPMENT_NO=" + str(i)
-        turbine = exec_sql(sql_turbine, engine)
-
-        #直接导出所有数据
-        utils.savedata.saveData("turbine-all/turbine-{}.csv".format(i), turbine)
-
-        #每15分钟导出一个数据
-        filtered_df = turbine[turbine['C_TIME'].isin(NWP_date)]
-        utils.savedata.saveData("turbine-15/turbine-{}.csv".format(i), filtered_df)
-
-def get_process_tower(database):
-    """
-    获取测风塔数据
-    :param database:
-    :return:
-    """
-    engine = create_database(database)
-    print("现有测风塔:{}".format(arg.towerloc))
-    for i in arg.towerloc:
-        print("测风塔{}导出数据".format(i))
-        # 删除没用的列
-        drop_colmns = ["C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
-
-        get_colmns = []
-        # 查询表的所有列名
-        result_set = exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
-        for name in result_set.iloc[:,0]:
-            if name not in drop_colmns:
-                get_colmns.append(name)
-
-        all_columns_str = ", ".join([f'{col}' for col in get_colmns])
-
-        tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i)
-        tower = exec_sql(tower_sql, engine)
-        utils.savedata.saveData("tower/tower-{}.csv".format(i), tower)
-
-def get_process_power(database):
-    """
-    获取整体功率数据
-    :param database:
-    :return:
-    """
-    engine = create_database(database)
-    sql_power = "select C_TIME,C_REAL_VALUE from t_power_station_status_data"
-    power = exec_sql(sql_power, engine)
-    utils.savedata.saveData("power.csv", power)
-
-
-def get_process_dq(database):
-    """
-    获取短期预测结果
-    :param database:
-    :return:
-    """
-    engine = create_database(database)
-    sql_dq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_short_term_his"
-    dq = exec_sql(sql_dq, engine)
-    dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
-    utils.savedata.saveData("dq.csv", dq)
-
-def get_process_cdq(database):
-    """
-    获取超短期预测结果
-    :param database:
-    :return:
-    """
-    engine = create_database(database)
-    sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_ultra_short_term_his"
-    cdq = exec_sql(sql_cdq, engine)
-    cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
-    utils.savedata.saveData("cdq.csv", cdq)
-
-
-def get_turbine_info(database):
-    """
-    获取风机信息
-    :param database:
-    :return:
-    """
-    engine = create_engine(database)
-    sql_turbine = "select C_ID, C_LATITUDE as '纬度', C_LONGITUDE as '经度', C_HUB_HEIGHT as '轮毂高度' from t_wind_turbine_info"
-    turbine_info = exec_sql(sql_turbine, engine)
-    utils.savedata.saveData("风机信息.csv", turbine_info)
-
-def indep_process():
-    """
-    进一步数据处理:时间统一处理等
-    :return:
-    """
-    # 测风塔数据处理
-    for i in arg.towerloc:
-        tower = utils.savedata.readData("/tower/tower-{}.csv".format(i))
-        # 判断每一列是否全是 -99
-        all_minus_99 = (tower == -99).all()
-
-        # 获取全是 -99 的列的列名
-        cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist()
-
-        # 使用 drop() 方法删除列
-        tower = tower.drop(cols_to_drop, axis=1)
-        # MBD: 将一部分是-99的列删除,把-99替换为nan
-        tower_nan = tower.replace(-99, np.nan, inplace=False)
-        # nan 超过80% 删除
-        tower = tower.dropna(axis=1, thresh=len(tower_nan) * 0.8)
-        utils.savedata.saveData("/tower/tower-{}-process.csv".format(i), tower)
-
-    # 测风塔时间统一
-    tower1 = utils.savedata.readData("/tower/tower-{}-process.csv".format(1))
-    # tower2 = utils.savedata.readData("/tower/tower-{}-process.csv".format(2))
-    # tower1 = tower1[tower1['C_TIME'].isin(tower2['C_TIME'])]
-    # tower2 = tower2[tower2['C_TIME'].isin(tower1['C_TIME'])]
-
-    utils.savedata.saveData("/tower/tower-{}-process.csv".format(1), tower1)
-    # utils.savedata.saveData("/tower/tower-{}-process.csv".format(2), tower2)
-
-    # 所有表时间统一
-    filenames = ["/NWP.csv","/power.csv", "/dq.csv", "/cdq.csv", '/tower/tower-1-process.csv']
-    dataframes = []
-    for i in arg.turbineloc:
-        filenames.append("/turbine-15/turbine-{}.csv".format(i))
-    for name in filenames:
-        dataframes.append(utils.savedata.readData(name))
-
-    # 查找最大起始时间和最小结束时间
-    max_start_time = max(df['C_TIME'].min() for df in dataframes)
-    min_end_time = min(df['C_TIME'].max() for df in dataframes)
-
-    print(max_start_time)
-    print(min_end_time)
-
-    # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
-    for i, df in enumerate(dataframes):
-        df['C_TIME'] = pd.to_datetime(df['C_TIME'])  # 确保时间列是 datetime 类型
-        df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
-
-        # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
-        utils.savedata.saveData(filenames[i],df_filtered)
-
-def NWP_indep_process():
-    """
-    将NWP数据按照缺失值数量划分为N个不同数据集
-    :return:
-    """
-    # NWP数据进一步处理
-    NWP = utils.savedata.readData("NWP.csv")
-    df = pd.to_datetime(NWP['C_TIME'])
-    time_diff = df.diff()
-    time_diff_threshold = pd.Timedelta(minutes=15)
-    missing_values = df[time_diff > time_diff_threshold]
-    print("NWP数据缺失的数量为:{}".format(len(missing_values)))
-    print(missing_values)
-
-    # 文件保存
-    utils.savedata.saveVar("NWP_miss.pickle", missing_values)
-    split_indices = []
-    for i in range(len(missing_values)):
-        if i == 0:
-            split_indices.append((0, missing_values.index[i]))
-        else:
-            split_indices.append((missing_values.index[i - 1], missing_values.index[i]))
-    split_indices.append((missing_values.index[-1], len(df)))  # MBD: 分割少了一个点
-    split_datasets = [NWP.iloc[start:end,:] for start, end in split_indices]
-    for i, split_df in enumerate(split_datasets):
-        utils.savedata.saveData("Dataset_training/NWP/NWP_{}.csv".format(i),split_df)
-
-    return split_datasets
-
-
-# def power_indep_process():
-#     NWP = utils.savedata.readData("power.csv")
-
-def Data_split():
-    """
-    这个函数没用上,可以不看
-    :return:
-    """
-    NWP = utils.savedata.readData("power_15min.csv")
-    df = pd.to_datetime(NWP['C_TIME'])
-    time_diff = df.diff()
-    time_diff_threshold = pd.Timedelta(minutes=15)
-    missing_values = df[time_diff > time_diff_threshold]
-    print("NWP数据缺失的数量为:{}".format(len(missing_values)))
-    print(missing_values)
-    NWP_miss = utils.savedata.readVar("NWP_miss.pickle")
-
-    for t in missing_values.index:
-        a = t-1
-        b = t
-        time1 = NWP['C_TIME'][a]
-        time2 = NWP['C_TIME'][b]
-        df = pd.to_datetime([time1, time2])
-        # 计算时间差
-        time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
-        print(time_diff)
-
-    time1 = "2022-10-27 14:00:00"
-    time2 = "2023-04-16 12:00:00"
-    df = pd.to_datetime([time1, time2])
-    # 计算时间差
-    time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
-    print(time_diff)
-
-def time_all_in():
-    """
-    这个函数暂时没用上,这个函数的目的是给机头数据进行填充,找到时间缺失的位置,填充为-99
-    :return:
-    """
-    filenames = []
-    dataframes = []
-    for i in arg.turbineloc:
-        filenames.append("/turbine-15/turbine-{}.csv".format(i))
-    for name in filenames:
-        dataframes.append(utils.savedata.readData(name))
-
-    for df in dataframes:
-        df['C_TIME'] = pd.to_datetime(df['C_TIME'])
-
-        # 创建一个完整的时间序列索引,包括所有可能的时间点
-        start_time = df['C_TIME'].min()
-        end_time = df['C_TIME'].max()
-        full_time_range = pd.date_range(start_time, end_time, freq='15min')
-
-        # 使用完整的时间序列索引创建一个空的 DataFrame
-        full_df = pd.DataFrame(index=full_time_range)
-        full_df.index.name = 'C_TIME'
-
-        # 将原始数据与空的 DataFrame 合并
-        merged_df = full_df.merge(df, how='left', left_on='time', right_on='time')
-
-        # 使用 -99 填充缺失值,除了时间列
-        merged_df.fillna(-99, inplace=True)
-        merged_df.reset_index(inplace=True)
-
-
-
-def data_process(database):
-    """
-    数据导出+初步处理的总操控代码
-    :param database:
-    :return:
-    """
-    clear_data()
-    get_process_NWP(database)
-    get_process_turbine(database)
-    get_turbine_info(database)
-    get_process_tower(database)
-    get_process_power(database)
-    get_process_dq(database)
-    # get_process_cdq(database)
-    indep_process()
-    NWP_indep_process()
-    # Data_split()
-
-if __name__ == '__main__':
-
-    import os
-    import glob
-    # 设置文件夹路径
-    folder_path = '../data'
-
-    # 使用 glob 获取所有的 .csv 文件
-    csv_files = glob.glob(os.path.join(folder_path, '*.csv'))
-
-    # 遍历所有 .csv 文件并删除
-    for file_path in csv_files:
-        os.remove(file_path)
-    # database = "mysql+pymysql://root:!QAZ2root@192.168.1.205:3306/ipfcst-sishui-a"
-    # engine = create_database(database)
-    #
-    # # NPW数据
-    # sql_NWP = "select C_SC_DATE,C_SC_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp"
-    # NWP = exec_sql(sql_NWP,engine)
-    #
-    # # 分风机功率
-    # sql_wind = "select C_WS,C_ACTIVE_POWER from t_wind_turbine_status_data_15 WHERE C_EQUIPMENT_NO=2 and C_WS>0 and C_ACTIVE_POWER>0  "
-    # df_wind = exec_sql(sql_wind,engine)
-    # print(df_wind)
-
-    # 总功率数据读取
-    # sql_power = "select * from t_power_station_status_data"
-    # df_power = exec_sql(sql_power, engine)
-    filenames = []
-    dataframes = []
-    for i in arg.turbineloc:
-        filenames.append("../data/turbine-15/turbine-{}.csv".format(i))
-    for name in filenames:
-        dataframes.append(pd.read_csv(name).iloc[:7000,:])
-
-    # for df in  enumerate(dataframes):
-    #     df =
-    mean_of_first_columns = pd.concat([df['C_WS'] for df in dataframes], axis=1).mean(axis=1)
-    mean_of_second_columns = (pd.concat([df['C_ACTIVE_POWER'] for df in dataframes], axis=1).sum(axis=1)/1000).astype(int)
-    print(len(mean_of_first_columns))
-
-
-    plt.scatter(mean_of_first_columns, mean_of_second_columns)
-    plt.show()
-
-

+ 291 - 0
db-wind/inputData.py

@@ -0,0 +1,291 @@
+import pandas as pd
+import datetime, time
+import pytz
+from savedata import saveData, readData
+import os
+from sqlalchemy import create_engine
+import pytz
+from data_cleaning import cleaning, rm_duplicated
+current_path = os.path.dirname(__file__)
+dataloc = current_path + '/data/'
+
+def readData(name):
+    """
+    读取数据
+    :param name: 名字
+    :return:
+    """
+    path = dataloc + r"/" + name
+    return pd.read_csv(path)
+
+
+def saveData(name, data):
+    """
+    存放数据
+    :param name: 名字
+    :param data: 数据
+    :return:
+    """
+    path = dataloc + r"/" + name
+    os.makedirs(os.path.dirname(path), exist_ok=True)
+    data.to_csv(path, index=False)
+
+
+def timestamp_to_datetime(ts):
+    local_timezone = pytz.timezone('Asia/Shanghai')
+    if type(ts) is not int:
+        raise ValueError("timestamp-时间格式必须是整型")
+    if len(str(ts)) == 13:
+        dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    elif len(str(ts)) == 10:
+        dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
+        return dt
+    else:
+        raise ValueError("timestamp-时间格式错误")
+
+
+def timestr_to_timestamp(time_str):
+    """
+    将时间戳或时间字符串转换为datetime.datetime类型
+    :param time_data: int or str
+    :return:datetime.datetime
+    """
+    if isinstance(time_str, str):
+        if len(time_str) == 10:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
+            return int(round(time.mktime(dt.timetuple())) * 1000)
+        elif len(time_str) in {17, 18, 19}:
+            dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S')   # strptime字符串解析必须严格按照字符串中的格式
+            return int(round(time.mktime(dt.timetuple())) * 1000)   # 转换成毫秒级的时间戳
+        else:
+            raise ValueError("时间字符串长度不满足要求!")
+    else:
+        return time_str
+
+
+class DataBase(object):
+    def __init__(self, begin, end, database):
+        self.begin = begin
+        self.end = end - pd.Timedelta(minutes=15)
+        self.begin_stamp = timestr_to_timestamp(str(begin))
+        self.end_stamp = timestr_to_timestamp(str(self.end))
+        self.database = database
+        self.towerloc = [1]
+
+    def clear_data(self):
+        """
+        删除所有csv
+        :return:
+        """
+        # 设置文件夹路径
+        import glob
+        import os
+        folder_path = dataloc
+
+        # 使用 glob 获取所有的 .csv 文件路径
+        csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
+
+        # 遍历所有 .csv 文件并删除
+        for file_path in csv_files:
+            os.remove(file_path)
+        print("清除所有csv文件")
+
+    def create_database(self):
+        """
+        创建数据库连接
+        :param database: 数据库地址
+        :return:
+        """
+        engine = create_engine(self.database)
+        return engine
+
+    def exec_sql(self, sql, engine):
+        """
+        从数据库获取数据
+        :param sql: sql语句
+        :param engine: 数据库对象
+        :return:
+        """
+        df = pd.read_sql_query(sql, engine)
+        return df
+
+    def split_time(self, data):
+        data.set_index('C_TIME', inplace=True)
+        data = data.sort_index().loc[self.begin: self.end]
+        data.reset_index(drop=False, inplace=True)
+        return data
+
+    def get_process_NWP(self):
+        """
+        从数据库中获取NWP数据,并进行简单处理
+        :param database:
+        :return:
+        """
+        # NPW数据
+        engine = self.create_database()
+        sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
+                  "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION,  " \
+                  "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
+                  "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" \
+                  " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 风的NWP字段
+        NWP = self.exec_sql(sql_NWP, engine)
+
+        NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
+
+        NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
+        NWP = cleaning(NWP, 'NWP')
+        # NWP = self.split_time(NWP)
+        NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("NWP.csv", NWP)
+        return NWP
+
+    def get_process_tower(self):
+        """
+        获取环境检测仪数据
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        print("提取测风塔:{}".format(self.towerloc))
+        for i in self.towerloc:
+            # 删除没用的列
+            drop_colmns = ["C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
+
+            get_colmns = []
+            # 查询表的所有列名
+            result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
+            for name in result_set.iloc[:, 0]:
+                if name not in drop_colmns:
+                    get_colmns.append(name)
+
+            all_columns_str = ", ".join([f'{col}' for col in get_colmns])
+            tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)
+            tower = self.exec_sql(tower_sql, engine)
+            tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
+            saveData("/tower-{}.csv".format(i), tower)
+            print("测风塔{}导出数据".format(i))
+
+
+    def get_process_power(self):
+        """
+        获取整体功率数据
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_cap = "select C_CAPACITY from t_electric_field"
+        cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
+        sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
+                    " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
+        # if self.opt.usable_power["clean_power_by_signal"]:
+        #     sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and  C_IS_RATIONING_BY_AUTO_CONTROL=0"
+        powers = self.exec_sql(sql_power, engine)
+        powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
+        mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
+        mask = powers['C_REAL_VALUE'] == -99
+
+        mask = mask | mask1
+        print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
+        powers = powers[~mask]
+        print("剔除完后还剩{}条".format(len(powers)))
+        binary_map = {b'\x00': 0, b'\x01': 1}
+        powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
+        powers = rm_duplicated(powers)
+        saveData("power.csv", powers)
+
+    def get_process_dq(self):
+        """
+        获取短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
+                 "where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
+        dq = self.exec_sql(sql_dq, engine)
+        # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
+        dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
+        # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
+        # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
+        dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
+        dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("dq.csv", dq)
+
+    def indep_process(self):
+        """
+        进一步数据处理:时间统一处理等
+        :return:
+        """
+        # 测风塔数据处理
+        for i in self.towerloc:
+            tower = readData("/tower-{}.csv".format(i))
+            tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
+            # 判断每一列是否全是 -99
+            all_minus_99 = (tower == -99).all()
+
+            # 获取全是 -99 的列的列名
+            cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist()
+
+            # 使用 drop() 方法删除列
+            tower = tower.drop(cols_to_drop, axis=1)
+            tower = cleaning(tower, 'tower', ['C_WS_INST_HUB_HEIGHT'])
+            saveData("/tower-{}-process.csv".format(i), tower)
+
+    def get_process_cdq(self):
+        """
+        获取超短期预测结果
+        :param database:
+        :return:
+        """
+        engine = self.create_database()
+        sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \
+                  " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
+        cdq = self.exec_sql(sql_cdq, engine)
+        cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
+        cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
+        # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
+        cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
+        saveData("cdq.csv", cdq)
+
+    def get_process_turbine(self):
+        """
+        从数据库中获取风头数据,并进行简单处理
+        :param database:
+        :return:
+        """
+        number = self.opt.usable_power['turbine_id']
+        # 机头数据
+        engine = self.create_database()
+
+        self.logger.info("导出风机{}的数据".format(number))
+        sql_turbine = "select C_TIME, C_WS, C_WD, C_ACTIVE_POWER from t_wind_turbine_status_data " \
+                      "WHERE C_EQUIPMENT_NO=" + str(number) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)    # + " and C_WS>0 and C_ACTIVE_POWER>0"
+        turbine = self.exec_sql(sql_turbine, engine)
+        turbine = cleaning(turbine, 'cdq', cols=['C_WS', 'C_ACTIVE_POWER'], dup=False)
+        turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
+        # 直接导出所有数据
+        saveData("turbine-{}.csv".format(number), turbine)
+
+
+
+    def data_process(self):
+        """
+        数据导出+初步处理的总操控代码
+        :param database:
+        :return:
+        """
+        self.clear_data()
+        try:
+            self.get_process_power()
+            self.get_process_dq()
+            # self.get_process_cdq()
+            self.get_process_NWP()
+            self.get_process_tower()
+            # self.get_process_turbine()
+            self.indep_process()
+        except Exception as e:
+            print("导出数据出错:{}".format(e.args))
+
+
+

+ 19 - 16
db-wind/main.py

@@ -1,8 +1,8 @@
 import pandas as pd
 import os
 from datetime import timedelta
-from getdata import inputData
-from utils import Arg
+from inputData import DataBase
+import Arg
 #——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
 def mark_abnormal_streaks(df, columns, min_streak):
     abnormal_mask = pd.Series(False, index=df.index)
@@ -205,19 +205,22 @@ def Continuous_Data(input_dir,output_dir,M,TopN):
 
 
 if __name__ == "__main__":
+    import datetime
     arg = Arg.Arg()
-    inputData.data_process(arg.database)
-    input_dir = "../data_mts/turbine-15"  # 输入文件夹路径
-    output_dir = "../data_mts/output_clean_csv_files"  # 输出文件夹路径
-    # 对机头风速连续异常值和-99进行清洗,第三个参数是连续5个值不变以后就认为异常
-    # 这步会生成一个"output_clean_csv_files"文件夹,里面包含全部单机的数据,存储的机头风速只清理了-99,参数50是风机数量+1,风机参数5就是连续5个点的认为是异常值,全部剔除。
-    process_csv_files(input_dir, output_dir, 50, 5)
-    output_dir_time_Merge = "../data_mts/output_filtered_csv_files"
-    # 这步会生成一个"output_filtered_csv_files"文件夹,在上一步的基础上,对齐了全部风机的时间,只各自保留了交集。
-    TimeMerge(output_dir,output_dir_time_Merge,50)
-    output_complete_data = "../data_mts/complete_data"
+    db = DataBase(begin=datetime.datetime.strptime(arg.begin, '%Y-%m-%d'),
+                  end=datetime.datetime.strptime(arg.end, '%Y-%m-%d'), database=arg.database)
+    db.data_process()
+    # input_dir = "../data_mts/turbine-15"  # 输入文件夹路径
+    # output_dir = "../data_mts/output_clean_csv_files"  # 输出文件夹路径
+    # # 对机头风速连续异常值和-99进行清洗,第三个参数是连续5个值不变以后就认为异常
+    # # 这步会生成一个"output_clean_csv_files"文件夹,里面包含全部单机的数据,存储的机头风速只清理了-99,参数50是风机数量+1,风机参数5就是连续5个点的认为是异常值,全部剔除。
+    # process_csv_files(input_dir, output_dir, 50, 5)
+    # output_dir_time_Merge = "../data_mts/output_filtered_csv_files"
+    # # 这步会生成一个"output_filtered_csv_files"文件夹,在上一步的基础上,对齐了全部风机的时间,只各自保留了交集。
+    # TimeMerge(output_dir,output_dir_time_Merge,50)
+    # output_complete_data = "../data_mts/complete_data"
     # 这步会生成一个"complete_data"文件夹,在上一步的基础上,填充了10个时间点之内的缺失。
-    MissingPointProcessing(output_dir_time_Merge,output_complete_data,50,10)
-    continuous_time = "../data_mts/continuous_data"
-    # 这步会生成一个"Continuous_data"文件夹,在上一步的基础上,取Top10个连续时间段最长的单机数据。
-    Continuous_Data(output_complete_data, continuous_time, 50, 10)
+    # MissingPointProcessing(output_dir_time_Merge,output_complete_data,50,10)
+    # continuous_time = "../data_mts/continuous_data"
+    # # 这步会生成一个"Continuous_data"文件夹,在上一步的基础上,取Top10个连续时间段最长的单机数据。
+    # Continuous_Data(output_complete_data, continuous_time, 50, 10)

+ 1 - 1
db-wind/utils/savedata.py → db-wind/savedata.py

@@ -1,6 +1,6 @@
 import pickle
 import pandas as pd
-from utils import Arg
+import Arg
 import os
 
 arg = Arg.Arg()

+ 0 - 0
db-wind/getdata/splitdata.py → db-wind/splitdata.py