import pymysql import pandas as pd import numpy as np from sqlalchemy import create_engine import matplotlib.pyplot as plt import pytz plt.rcParams['font.sans-serif'] = ['SimHei'] import utils.savedata from utils import Arg from norm import Normalize arg = Arg.Arg() norm = Normalize() def clear_data(): """ 删除所有csv :return: """ # 设置文件夹路径 import glob import os folder_path = arg.dataloc # 使用 glob 获取所有的 .csv 文件路径 csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True) # 遍历所有 .csv 文件并删除 for file_path in csv_files: os.remove(file_path) print("清除所有scv文件") def create_database(database): """ 创建数据库连接 :param database: 数据库地址 :return: """ engine = create_engine(database) return engine def exec_sql(sql,engine): """ 从数据库获取数据 :param sql: sql语句 :param engine: 数据库对象 :return: """ df = pd.read_sql_query(sql, engine) return df def get_process_NWP(database): """ 从数据库中获取NWP数据,并进行简单处理 :param database: :return: """ # NPW数据 engine = create_database(database) sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR, C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" # 光的NWP字段 NWP = exec_sql(sql_NWP, engine) #删除后三位 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(str) NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].str[:-3] # 将 'timestamp' 列转换为日期时间格式 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(float) NWP['C_PRE_TIME'] = pd.to_datetime(NWP['C_PRE_TIME'], unit='s') # 将日期时间转换为本地时区 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.tz_localize(pytz.utc).dt.tz_convert('Asia/Shanghai') # 格式化日期时间为年月日时分秒 NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'}) utils.savedata.saveData("NWP.csv",NWP) norm.normalize(NWP) return NWP def get_process_weather(database): """ 获取环境检测仪数据 :param database: :return: """ engine = create_database(database) print("现有环境监测仪:{}".format(arg.weatherloc)) for i in arg.weatherloc: print("环境监测仪{}导出数据".format(i)) # 删除没用的列 drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"] get_colmns = [] # 查询表的所有列名 result_set = exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine) for name in result_set.iloc[:,0]: if name not in drop_colmns: get_colmns.append(name) all_columns_str = ", ".join([f'{col}' for col in get_colmns]) weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+str(i) weather = exec_sql(weather_sql, engine) utils.savedata.saveData("weather/weather-{}.csv".format(i), weather) norm.normalize(weather) def get_process_power(database): """ 获取整体功率数据 :param database: :return: """ engine = create_database(database) sql_cap = "select C_CAPACITY from t_electric_field" cap = exec_sql(sql_cap, engine)['C_CAPACITY'] sql_power = "select C_TIME,C_REAL_VALUE from t_power_station_status_data" powers = exec_sql(sql_power, engine) mask1 = powers['C_REAL_VALUE'] > float(cap) mask = powers['C_REAL_VALUE'] == -99 mask = mask | mask1 print("要剔除功率有{}条".format(mask.sum())) powers = powers[~mask] utils.savedata.saveData("power.csv", powers) power5, power_index = [], [0] # 功率表,索引表 ps = 0 # 获取5分钟一个间隔的功率数据 for i, power in powers.iterrows(): real_value = power['C_REAL_VALUE'] ps += real_value if str(power['C_TIME'].minute)[-1] in ('0', '5'): power_index.append(i) num = power_index[-1] - power_index[-2] num = num if num != 0 else 1 psa = round(ps / num, 2) power5.append([power['C_TIME'], psa]) ps = 0 power5 = pd.DataFrame(power5, columns=['C_TIME', 'C_REAL_VALUE']) utils.savedata.saveData("power5.csv", power5) norm.normalize(power5) def get_process_dq(database): """ 获取短期预测结果 :param database: :return: """ engine = create_database(database) sql_dq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_short_term_his" dq = exec_sql(sql_dq, engine) dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms') utils.savedata.saveData("dq.csv", dq) def get_process_cdq(database): """ 获取超短期预测结果 :param database: :return: """ engine = create_database(database) sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_ultra_short_term_his" cdq = exec_sql(sql_cdq, engine) cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'], unit='ms') utils.savedata.saveData("cdq.csv", cdq) def indep_process(): """ 进一步数据处理:时间统一处理等 :return: """ # 环境监测仪数据处理 for i in arg.weatherloc: weather = utils.savedata.readData("/weather/weather-{}.csv".format(i)) # 判断每一列是否全是 -99 all_minus_99 = (weather == -99).all() # 获取全是 -99 的列的列名 cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist() # 使用 drop() 方法删除列 weather = weather.drop(cols_to_drop, axis=1) # MBD: 将一部分是-99的列删除,把-99替换为nan weather_nan = weather.replace(-99, np.nan, inplace=False) # nan 超过80% 删除 weather = weather.dropna(axis=1, thresh=len(weather_nan) * 0.8) weather = weather.replace(np.nan, -99, inplace=False) # 删除取值全部相同的列 weather = weather.loc[:, (weather != weather.iloc[0]).any()] utils.savedata.saveData("/weather/weather-{}-process.csv".format(i), weather) # 时间统一 weather1 = utils.savedata.readData("/weather/weather-{}-process.csv".format(1)) # tower2 = utils.savedata.readData("/tower/tower-{}-process.csv".format(2)) # tower1 = tower1[tower1['C_TIME'].isin(tower2['C_TIME'])] # tower2 = tower2[tower2['C_TIME'].isin(tower1['C_TIME'])] utils.savedata.saveData("/weather/weather-{}-process.csv".format(1), weather1) # utils.savedata.saveData("/tower/tower-{}-process.csv".format(2), tower2) # 所有表时间统一 filenames = ["/NWP.csv","/power.csv", "power5.csv", "/dq.csv", "/cdq.csv", '/weather/weather-1-process.csv'] dataframes = [] for name in filenames: dataframes.append(utils.savedata.readData(name)) # 查找最大起始时间和最小结束时间 max_start_time = max(df['C_TIME'].min() for df in dataframes) min_end_time = min(df['C_TIME'].max() for df in dataframes) print(max_start_time) print(min_end_time) # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据 for i, df in enumerate(dataframes): df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 确保时间列是 datetime 类型 df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)] # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀 utils.savedata.saveData(filenames[i],df_filtered) def NWP_indep_process(): """ 将NWP数据按照缺失值数量划分为N个不同数据集 :return: """ # NWP数据进一步处理 NWP = utils.savedata.readData("NWP.csv") df = pd.to_datetime(NWP['C_TIME']) time_diff = df.diff() time_diff_threshold = pd.Timedelta(minutes=15) missing_values = df[time_diff > time_diff_threshold] print("NWP数据缺失的数量为:{}".format(len(missing_values))) print(missing_values) # 文件保存 utils.savedata.saveVar("NWP_miss.pickle", missing_values) split_indices = [] for i in range(len(missing_values)): if i == 0: split_indices.append((0, missing_values.index[i])) else: split_indices.append((missing_values.index[i - 1], missing_values.index[i])) split_indices.append((missing_values.index[-1], len(df))) # MBD: 分割少了一个点 split_datasets = [NWP.iloc[start:end,:] for start, end in split_indices] for i, split_df in enumerate(split_datasets): utils.savedata.saveData("Dataset_training/NWP/NWP_{}.csv".format(i),split_df) return split_datasets # def power_indep_process(): # NWP = utils.savedata.readData("power.csv") def Data_split(): """ 这个函数没用上,可以不看 :return: """ NWP = utils.savedata.readData("power_15min.csv") df = pd.to_datetime(NWP['C_TIME']) time_diff = df.diff() time_diff_threshold = pd.Timedelta(minutes=15) missing_values = df[time_diff > time_diff_threshold] print("NWP数据缺失的数量为:{}".format(len(missing_values))) print(missing_values) NWP_miss = utils.savedata.readVar("NWP_miss.pickle") for t in missing_values.index: a = t-1 b = t time1 = NWP['C_TIME'][a] time2 = NWP['C_TIME'][b] df = pd.to_datetime([time1, time2]) # 计算时间差 time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15) print(time_diff) time1 = "2022-10-27 14:00:00" time2 = "2023-04-16 12:00:00" df = pd.to_datetime([time1, time2]) # 计算时间差 time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15) print(time_diff) def time_all_in(): """ 这个函数暂时没用上,这个函数的目的是给机头数据进行填充,找到时间缺失的位置,填充为-99 :return: """ filenames = [] dataframes = [] for i in arg.turbineloc: filenames.append("/turbine-15/turbine-{}.csv".format(i)) for name in filenames: dataframes.append(utils.savedata.readData(name)) for df in dataframes: df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 创建一个完整的时间序列索引,包括所有可能的时间点 start_time = df['C_TIME'].min() end_time = df['C_TIME'].max() full_time_range = pd.date_range(start_time, end_time, freq='15min') # 使用完整的时间序列索引创建一个空的 DataFrame full_df = pd.DataFrame(index=full_time_range) full_df.index.name = 'C_TIME' # 将原始数据与空的 DataFrame 合并 merged_df = full_df.merge(df, how='left', left_on='time', right_on='time') # 使用 -99 填充缺失值,除了时间列 merged_df.fillna(-99, inplace=True) merged_df.reset_index(inplace=True) def data_process(database): """ 数据导出+初步处理的总操控代码 :param database: :return: """ clear_data() get_process_power(database) get_process_dq(database) get_process_cdq(database) get_process_NWP(database) get_process_weather(database) indep_process() NWP_indep_process() norm.save_yml({'mean': norm.mean, 'std': norm.std}, arg.normloc) if __name__ == '__main__': import os import glob # 设置文件夹路径 folder_path = '../data' # 使用 glob 获取所有的 .csv 文件 csv_files = glob.glob(os.path.join(folder_path, '*.csv')) # 遍历所有 .csv 文件并删除 for file_path in csv_files: os.remove(file_path) # database = "mysql+pymysql://root:!QAZ2root@192.168.1.205:3306/ipfcst-sishui-a" # engine = create_database(database) # # # NPW数据 # sql_NWP = "select C_SC_DATE,C_SC_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" # NWP = exec_sql(sql_NWP,engine) # # # 分风机功率 # sql_wind = "select C_WS,C_ACTIVE_POWER from t_wind_turbine_status_data_15 WHERE C_EQUIPMENT_NO=2 and C_WS>0 and C_ACTIVE_POWER>0 " # df_wind = exec_sql(sql_wind,engine) # print(df_wind) # 总功率数据读取 # sql_power = "select * from t_power_station_status_data" # df_power = exec_sql(sql_power, engine) filenames = [] dataframes = [] for i in arg.turbineloc: filenames.append("../data/turbine-15/turbine-{}.csv".format(i)) for name in filenames: dataframes.append(pd.read_csv(name).iloc[:7000,:]) # for df in enumerate(dataframes): # df = mean_of_first_columns = pd.concat([df['C_WS'] for df in dataframes], axis=1).mean(axis=1) mean_of_second_columns = (pd.concat([df['C_ACTIVE_POWER'] for df in dataframes], axis=1).sum(axis=1)/1000).astype(int) print(len(mean_of_first_columns)) plt.scatter(mean_of_first_columns, mean_of_second_columns) plt.show()