import pymysql import pandas as pd import numpy as np from sqlalchemy import create_engine import matplotlib.pyplot as plt import pytz plt.rcParams['font.sans-serif'] = ['SimHei'] import utils.savedata from utils import Arg arg = Arg.Arg() def clear_data(): """ 删除所有csv :return: """ # 设置文件夹路径 import glob import os folder_path = arg.dataloc # 使用 glob 获取所有的 .csv 文件路径 csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True) # 遍历所有 .csv 文件并删除 for file_path in csv_files: os.remove(file_path) print("清除所有scv文件") def create_database(database): """ 创建数据库连接 :param database: 数据库地址 :return: """ engine = create_engine(database) return engine def exec_sql(sql,engine): """ 从数据库获取数据 :param sql: sql语句 :param engine: 数据库对象 :return: """ df = pd.read_sql_query(sql, engine) return df def get_process_NWP(database): """ 从数据库中获取NWP数据,并进行简单处理 :param database: :return: """ # NPW数据 engine = create_database(database) sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" NWP = exec_sql(sql_NWP, engine) #删除后三位 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(str) NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].str[:-3] # 将 'timestamp' 列转换为日期时间格式 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(float) NWP['C_PRE_TIME'] = pd.to_datetime(NWP['C_PRE_TIME'], unit='s') # 将日期时间转换为本地时区 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.tz_localize(pytz.utc).dt.tz_convert('Asia/Shanghai') # 格式化日期时间为年月日时分秒 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'}) utils.savedata.saveData("NWP.csv",NWP) return NWP def get_process_turbine(database): """ 从数据库中获取风头数据,并进行简单处理 :param database: :return: """ # 获取NWP数据 NWP = utils.savedata.readData("NWP.csv") NWP_date = NWP.iloc[:,0] print(NWP_date) # 机头数据 engine = create_database(database) for i in arg.turbineloc: print("导出风机{}的数据".format(i)) sql_turbine = "select C_TIME,C_WS,C_WD,C_ACTIVE_POWER from t_wind_turbine_status_data WHERE C_EQUIPMENT_NO=" + str(i) #+ " and C_WS>0 and C_ACTIVE_POWER>0" turbine = exec_sql(sql_turbine, engine) #直接导出所有数据 utils.savedata.saveData("turbine-all/turbine-{}.csv".format(i), turbine) #每15分钟导出一个数据 filtered_df = turbine[turbine['C_TIME'].isin(NWP_date)] utils.savedata.saveData("turbine-15/turbine-{}.csv".format(i), filtered_df) def get_process_tower(database): """ 获取测风塔数据 :param database: :return: """ engine = create_database(database) print("现有测风塔:{}".format(arg.towerloc)) for i in arg.towerloc: print("测风塔{}导出数据".format(i)) # 删除没用的列 drop_colmns = ["C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"] get_colmns = [] # 查询表的所有列名 result_set = exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine) for name in result_set.iloc[:,0]: if name not in drop_colmns: get_colmns.append(name) all_columns_str = ", ".join([f'{col}' for col in get_colmns]) tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) tower = exec_sql(tower_sql, engine) utils.savedata.saveData("tower/tower-{}.csv".format(i), tower) def get_process_power(database): """ 获取整体功率数据 :param database: :return: """ engine = create_database(database) sql_power = "select C_TIME,C_REAL_VALUE from t_power_station_status_data" power = exec_sql(sql_power, engine) utils.savedata.saveData("power.csv", power) def get_turbine_info(database): """ 获取风机信息 :param database: :return: """ engine = create_engine(database) sql_turbine = "select C_ID, C_LATITUDE as '纬度', C_LONGITUDE as '经度', C_HUB_HEIGHT as '轮毂高度' from t_wind_turbine_info" turbine_info = exec_sql(sql_turbine, engine) utils.savedata.saveData("风机信息.csv", turbine_info) def indep_process(): """ 进一步数据处理:时间统一处理等 :return: """ # 测风塔数据处理 for i in arg.towerloc: tower = utils.savedata.readData("/tower/tower-{}.csv".format(i)) # 判断每一列是否全是 -99 all_minus_99 = (tower == -99).all() # 获取全是 -99 的列的列名 cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist() # 使用 drop() 方法删除列 tower = tower.drop(cols_to_drop, axis=1) # MBD: 将一部分是-99的列删除,把-99替换为nan tower_nan = tower.replace(-99, np.nan, inplace=False) # nan 超过80% 删除 tower = tower.dropna(axis=1, thresh=len(tower_nan) * 0.8) utils.savedata.saveData("/tower/tower-{}-process.csv".format(i), tower) # 测风塔时间统一 tower1 = utils.savedata.readData("/tower/tower-{}-process.csv".format(1)) # tower2 = utils.savedata.readData("/tower/tower-{}-process.csv".format(2)) # tower1 = tower1[tower1['C_TIME'].isin(tower2['C_TIME'])] # tower2 = tower2[tower2['C_TIME'].isin(tower1['C_TIME'])] utils.savedata.saveData("/tower/tower-{}-process.csv".format(1), tower1) # utils.savedata.saveData("/tower/tower-{}-process.csv".format(2), tower2) # 所有表时间统一 filenames = ["/NWP.csv","/power.csv", '/tower/tower-1-process.csv'] dataframes = [] for i in arg.turbineloc: filenames.append("/turbine-15/turbine-{}.csv".format(i)) for name in filenames: dataframes.append(utils.savedata.readData(name)) # 查找最大起始时间和最小结束时间 max_start_time = max(df['C_TIME'].min() for df in dataframes) min_end_time = min(df['C_TIME'].max() for df in dataframes) print(max_start_time) print(min_end_time) # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据 for i, df in enumerate(dataframes): df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 确保时间列是 datetime 类型 df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)] # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀 utils.savedata.saveData(filenames[i],df_filtered) def NWP_indep_process(): """ 将NWP数据按照缺失值数量划分为N个不同数据集 :return: """ # NWP数据进一步处理 NWP = utils.savedata.readData("NWP.csv") df = pd.to_datetime(NWP['C_TIME']) time_diff = df.diff() time_diff_threshold = pd.Timedelta(minutes=15) missing_values = df[time_diff > time_diff_threshold] print("NWP数据缺失的数量为:{}".format(len(missing_values))) print(missing_values) # 文件保存 utils.savedata.saveVar("NWP_miss.pickle", missing_values) split_indices = [] for i in range(len(missing_values)): if i == 0: split_indices.append((0, missing_values.index[i])) else: split_indices.append((missing_values.index[i - 1], missing_values.index[i])) split_indices.append((missing_values.index[-1], len(df))) # MBD: 分割少了一个点 split_datasets = [NWP.iloc[start:end,:] for start, end in split_indices] for i, split_df in enumerate(split_datasets): utils.savedata.saveData("Dataset_training/NWP/NWP_{}.csv".format(i),split_df) return split_datasets # def power_indep_process(): # NWP = utils.savedata.readData("power.csv") def Data_split(): """ 这个函数没用上,可以不看 :return: """ NWP = utils.savedata.readData("power_15min.csv") df = pd.to_datetime(NWP['C_TIME']) time_diff = df.diff() time_diff_threshold = pd.Timedelta(minutes=15) missing_values = df[time_diff > time_diff_threshold] print("NWP数据缺失的数量为:{}".format(len(missing_values))) print(missing_values) NWP_miss = utils.savedata.readVar("NWP_miss.pickle") for t in missing_values.index: a = t-1 b = t time1 = NWP['C_TIME'][a] time2 = NWP['C_TIME'][b] df = pd.to_datetime([time1, time2]) # 计算时间差 time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15) print(time_diff) time1 = "2022-10-27 14:00:00" time2 = "2023-04-16 12:00:00" df = pd.to_datetime([time1, time2]) # 计算时间差 time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15) print(time_diff) def time_all_in(): """ 这个函数暂时没用上,这个函数的目的是给机头数据进行填充,找到时间缺失的位置,填充为-99 :return: """ filenames = [] dataframes = [] for i in arg.turbineloc: filenames.append("/turbine-15/turbine-{}.csv".format(i)) for name in filenames: dataframes.append(utils.savedata.readData(name)) for df in dataframes: df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 创建一个完整的时间序列索引,包括所有可能的时间点 start_time = df['C_TIME'].min() end_time = df['C_TIME'].max() full_time_range = pd.date_range(start_time, end_time, freq='15min') # 使用完整的时间序列索引创建一个空的 DataFrame full_df = pd.DataFrame(index=full_time_range) full_df.index.name = 'C_TIME' # 将原始数据与空的 DataFrame 合并 merged_df = full_df.merge(df, how='left', left_on='time', right_on='time') # 使用 -99 填充缺失值,除了时间列 merged_df.fillna(-99, inplace=True) merged_df.reset_index(inplace=True) def data_process(database): """ 数据导出+初步处理的总操控代码 :param database: :return: """ clear_data() get_process_NWP(database) get_process_turbine(database) get_turbine_info(database) get_process_tower(database) get_process_power(database) indep_process() NWP_indep_process() # Data_split() if __name__ == '__main__': import os import glob # 设置文件夹路径 folder_path = '../data' # 使用 glob 获取所有的 .csv 文件 csv_files = glob.glob(os.path.join(folder_path, '*.csv')) # 遍历所有 .csv 文件并删除 for file_path in csv_files: os.remove(file_path) # database = "mysql+pymysql://root:!QAZ2root@192.168.1.205:3306/ipfcst-sishui-a" # engine = create_database(database) # # # NPW数据 # sql_NWP = "select C_SC_DATE,C_SC_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" # NWP = exec_sql(sql_NWP,engine) # # # 分风机功率 # sql_wind = "select C_WS,C_ACTIVE_POWER from t_wind_turbine_status_data_15 WHERE C_EQUIPMENT_NO=2 and C_WS>0 and C_ACTIVE_POWER>0 " # df_wind = exec_sql(sql_wind,engine) # print(df_wind) # 总功率数据读取 # sql_power = "select * from t_power_station_status_data" # df_power = exec_sql(sql_power, engine) filenames = [] dataframes = [] for i in arg.turbineloc: filenames.append("../data/turbine-15/turbine-{}.csv".format(i)) for name in filenames: dataframes.append(pd.read_csv(name).iloc[:7000,:]) # for df in enumerate(dataframes): # df = mean_of_first_columns = pd.concat([df['C_WS'] for df in dataframes], axis=1).mean(axis=1) mean_of_second_columns = (pd.concat([df['C_ACTIVE_POWER'] for df in dataframes], axis=1).sum(axis=1)/1000).astype(int) print(len(mean_of_first_columns)) plt.scatter(mean_of_first_columns, mean_of_second_columns) plt.show()