import pandas as pd import datetime, time import re import os import pymysql from sqlalchemy import create_engine import pytz from data_cleaning import cleaning, rm_duplicated, key_field_row_cleaning current_path = os.path.dirname(__file__) dataloc = current_path + '/data/' def readData(name): """ 读取数据 :param name: 名字 :return: """ path = r"./cache/data/" + name return pd.read_csv(path) def saveData(name, data): """ 存放数据 :param name: 名字 :param data: 数据 :return: """ path = r"./cache/data/" + name os.makedirs(os.path.dirname(path), exist_ok=True) data.to_csv(path, index=False) def timestamp_to_datetime(ts): local_timezone = pytz.timezone('Asia/Shanghai') if type(ts) is not int: raise ValueError("timestamp-时间格式必须是整型") if len(str(ts)) == 13: dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone) return dt elif len(str(ts)) == 10: dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone) return dt else: raise ValueError("timestamp-时间格式错误") def dt_tag(dt): date = dt.replace(hour=0, minute=0, second=0) delta = (dt - date) / pd.Timedelta(minutes=15) return delta + 1 def timestr_to_timestamp(time_str): """ 将时间戳或时间字符串转换为datetime.datetime类型 :param time_data: int or str :return:datetime.datetime """ if isinstance(time_str, str): if len(time_str) == 10: dt = datetime.datetime.strptime(time_str, '%Y-%m-%d') return int(round(time.mktime(dt.timetuple())) * 1000) elif len(time_str) in {17, 18, 19}: dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式 return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳 else: raise ValueError("时间字符串长度不满足要求!") else: return time_str class DataBase(object): def __init__(self, begin, end, opt, logger): self.begin = begin self.opt = opt # self.his_begin = self.begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4) # self.end = end + pd.Timedelta(days=1) - pd.Timedelta(minutes=15) # self.begin_stamp = timestr_to_timestamp(str(begin)) # self.his_begin_stamp = timestr_to_timestamp(str(self.his_begin)) # self.end_stamp = timestr_to_timestamp(str(self.end)) self.database = opt.database self.logger = logger # self.towerloc = self.opt.tower def clear_data(self): """ 删除所有csv :return: """ # 设置文件夹路径 import glob import os folder_path = dataloc # 使用 glob 获取所有的 .csv 文件路径 csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True) # 遍历所有 .csv 文件并删除 for file_path in csv_files: os.remove(file_path) self.logger.info("清除所有csv文件") def create_database(self): """ 创建数据库连接 :param database: 数据库地址 :return: """ engine = create_engine(self.database) return engine def exec_sql(self, sql, engine): """ 从数据库获取数据 :param sql: sql语句 :param engine: 数据库对象 :return: """ df = pd.read_sql_query(sql, engine) return df def get_process_NWP(self): """ 从数据库中获取NWP数据,并进行简单处理 :param database: :return: """ # NPW数据 engine = self.create_database() sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \ "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \ "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \ "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" \ " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 风的NWP字段 NWP = self.exec_sql(sql_NWP, engine) NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime) NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'}) # NWP['DT_TAG'] = NWP.apply(lambda x: dt_tag(x['C_TIME']), axis=1) NWP = cleaning(NWP, 'NWP') # NWP = self.split_time(NWP) NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') saveData("NWP.csv", NWP) self.logger.info("导出nwp数据") return NWP def get_process_tower(self): """ 获取环境检测仪数据 :param database: :return: """ engine = self.create_database() self.logger.info("提取测风塔:{}".format(self.opt.towerloc)) for i in [self.opt.towerloc]: # 删除没用的列 drop_colmns = ["C_ID","C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"] get_colmns = [] # # 查询表的所有列名 result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine) tower = pd.read_csv("./cache/data/t_wind_tower_status_data.csv") tower.columns = list(result_set['Field'].values) for name in result_set.iloc[:,0]: if name not in drop_colmns: get_colmns.append(name) all_columns_str = ", ".join([f'{col}' for col in get_colmns]) # tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.his_begin, self.end) # tower = self.exec_sql(tower_sql, engine) tower = tower[all_columns_str.split(', ')] # tower.drop(columns=drop_colmns, inplace=True) tower['C_TIME'] = pd.to_datetime(tower['C_TIME']) saveData("/tower-{}.csv".format(i), tower) self.logger.info("测风塔{}导出数据".format(i)) def get_process_power(self): """ 获取整体功率数据 :param database: :return: """ powers = pd.read_csv('./cache/data/t_power_station_status_data.csv') shouzu = pd.read_csv('./cache/data/全场_原始数据_2024-11-23_16-22-42.csv') shouzu.rename(columns={'时间':'C_TIME', '场外受阻发电量': 'SHOUZU'}, inplace=True) shouzu['C_TIME'] = pd.to_datetime(shouzu['C_TIME']) shouzu = shouzu[~(shouzu['SHOUZU'] > 0)] engine = self.create_database() sql_cap = "select C_CAPACITY from t_electric_field" cap = self.exec_sql(sql_cap, engine)['C_CAPACITY'] self.opt.cap = float(cap) result_set = self.exec_sql("SHOW COLUMNS FROM t_power_station_status_data", engine) powers.columns = list(result_set.iloc[:, 0].values) powers['C_TIME'] = pd.to_datetime(powers['C_TIME']) powers = pd.merge(powers, shouzu, on='C_TIME') powers = powers[['C_TIME', 'C_REAL_VALUE', 'C_ABLE_VALUE', 'C_IS_RATIONING_BY_MANUAL_CONTROL', 'C_IS_RATIONING_BY_AUTO_CONTROL']] # if self.opt.usable_power["clean_power_by_signal"]: # sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and C_IS_RATIONING_BY_AUTO_CONTROL=0" powers['C_TIME'] = pd.to_datetime(powers['C_TIME']) mask2 = powers['C_REAL_VALUE'] < 0 mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap) mask = powers['C_REAL_VALUE'] == -99 mask = mask | mask1 | mask2 self.logger.info("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum())) powers = powers[~mask] self.logger.info("剔除完后还剩{}条".format(len(powers))) # binary_map = {b'\x00': 0, b'\x01': 1} # powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map) powers = rm_duplicated(powers) saveData("power_filter4.csv", powers) def get_process_dq(self): """ 获取短期预测结果 :param database: :return: """ engine = self.create_database() sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \ "where C_FORECAST_TIME between {} and {}".format(self.his_begin_stamp, self.end_stamp) dq = self.exec_sql(sql_dq, engine) # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms') dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime) # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1] # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True) dq = cleaning(dq, 'dq', cols=['C_FP_VALUE']) dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') saveData("dq.csv", dq) def indep_process(self): """ 进一步数据处理:时间统一处理等 :return: """ # 测风塔数据处理 for i in [self.opt.towerloc]: tower = readData("/tower-{}.csv".format(i)) tower = cleaning(tower, 'tower', [self.opt.usable_power["env"]]) tower['C_TIME'] = pd.to_datetime(tower['C_TIME']) tower_ave = tower.resample('15T', on='C_TIME').mean().reset_index() tower_ave = tower_ave.dropna(subset=[self.opt.usable_power['env']]) tower_ave.iloc[:, 1:] = tower_ave.iloc[:, 1:].round(2) saveData("/tower-{}-process.csv".format(i), tower_ave) def get_process_cdq(self): """ 获取超短期预测结果 :param database: :return: """ engine = self.create_database() sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \ " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) cdq = self.exec_sql(sql_cdq, engine) cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime) cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False) # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])] cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') saveData("cdq.csv", cdq) def get_process_turbine(self): """ 从数据库中获取风头数据,并进行简单处理 :param database: :return: """ cids = [x for x in range(33, 65, 1)] c_names = ['F01', 'F02', 'F03', 'F04', 'F05', 'F06', 'F07', 'F08', 'F09', 'F10']+['F'+str(x) for x in range(10, 32)] id_names = {id: c_names[x] for x, id in enumerate(cids)} turbines = pd.read_csv('./cache/data/turbines.csv') turbines_ori = pd.read_csv('./cache/data/全场_原始数据_2024-08-01_19-40-25.csv') turbines_ori['风机'] = turbines_ori['风机'].apply(lambda x: re.sub(r'\(.*?\)|\[.*?\]|\{.*?\}', '', x)) turbines_ori.rename(columns={"时间": "C_TIME"}, inplace=True) turbines_ori = turbines_ori[['C_TIME', '风机', '低位首触机组运行状态', '高位首触机组运行状态']] turbines_ori['C_TIME'] = pd.to_datetime(turbines_ori['C_TIME']) turbines['C_TIME'] = pd.to_datetime(turbines['C_TIME']) for number in self.opt.turbineloc: # number = self.opt.usable_power['turbine_id'] # 机头数据 turbine = turbines[turbines['C_EQUIPMENT_NO'] == number] turbine_ori = turbines_ori[(turbines_ori['风机'] == id_names[number]) & (turbines_ori['低位首触机组运行状态'] <=3) & (turbines_ori['高位首触机组运行状态'] <=3)] turbine = pd.merge(turbine, turbine_ori.loc[:, ["C_TIME", "风机"]], on="C_TIME") turbine.drop(columns=['风机'], inplace=True) turbine = key_field_row_cleaning(turbine, cols=['C_WS', 'C_ACTIVE_POWER']) turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])] # 直接导出所有数据 saveData("turbine-{}.csv".format(number), turbine) def process_csv_files(self, input_dir, output_dir, M, N): # MBD:没有考虑时间重复 if not os.path.exists(output_dir): os.makedirs(output_dir) for i in self.opt.turbineloc: input_file = os.path.join(input_dir, f"turbine-{i}.csv") output_file = os.path.join(output_dir, f"turbine-{i}.csv") # 读取csv文件 df = pd.read_csv(input_file) # 剔除异常值,并获取异常值统计信息 df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = self.remove_abnormal_values(df, N) # 输出异常值统计信息 self.logger.info(f"处理文件:{input_file}") self.logger.info(f"剔除 -99 点异常值数量:{count_abnormal1}") self.logger.info(f"剔除连续异常值数量:{count_abnormal2}") self.logger.info(f"总共剔除数据量:{total_removed}") self.logger.info(f"剔除的连续异常值具体数值:{removed_continuous_values}\n") # 保存处理过的CSV文件 df_clean.to_csv(output_file, index=False) def remove_abnormal_values(self,df, N): # 标记C_ACTIVE_POWER为-99的行为异常值 abnormal_mask1 = df['C_ACTIVE_POWER'] == -99 count_abnormal1 = abnormal_mask1.sum() # 标记C_WS, A, B连续5行不变的行为异常值 columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER'] abnormal_mask2 = self.mark_abnormal_streaks(df, columns, N) count_abnormal2 = abnormal_mask2.sum() # 获得所有异常值的布尔掩码 abnormal_mask = abnormal_mask1 | abnormal_mask2 # 获取连续异常值具体数值 removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns} # 剔除异常值 df_clean = df[~abnormal_mask] total_removed = abnormal_mask.sum() return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values # ——————————————————————————对分区的风机进行发电功率累加—————————————————————————————— def zone_powers(self, input_dir): z_power = {} for zone, turbines in self.opt.zone.items(): dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in self.opt.turbineloc if z in turbines] z_power['C_TIME'] = dfs[0]['C_TIME'] sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1) z_power[zone] = sum_power z_power = pd.DataFrame(z_power) z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2) saveData("z-power-t.csv", z_power) # ——————————————————————————机头风速-99和连续异常值清洗代码—————————————————————————————— def mark_abnormal_streaks(self, df, columns, min_streak): abnormal_mask = pd.Series(False, index=df.index) streak_start = None for i in range(len(df)): if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns): streak_start = i if i - streak_start >= min_streak - 1: abnormal_mask[i - min_streak + 1:i + 1] = True return abnormal_mask # ——————————————————————————风机单机时间对齐—————————————————————————————— def TimeMerge(self, input_dir, output_dir, M): # 读取所有CSV文件 files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in self.opt.turbineloc] dataframes = [pd.read_csv(f) for f in files] # 获取C_TIME列的交集 c_time_intersection = set(dataframes[0]["C_TIME"]) for df in dataframes[1:]: c_time_intersection.intersection_update(df["C_TIME"]) # 只保留C_TIME交集中的数据 filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes] # 将每个过滤后的DataFrame写入新的CSV文件 os.makedirs(output_dir, exist_ok=True) for (filtered_df, i) in zip(filtered_dataframes, self.opt.turbineloc): if i == 144: filtered_df['C_ACTIVE_POWER'] /= 1000 filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False) def data_process(self): """ 数据导出+初步处理的总操控代码 :param database: :return: """ # self.clear_data() self.get_process_power() # self.get_process_dq() # self.get_process_cdq() # self.get_process_NWP() # self.get_process_tower() # self.indep_process() self.get_process_turbine() self.process_csv_files('./cache/data', './cache/data', 50, 5) self.TimeMerge('./cache/data', './cache/data', 50) self.zone_powers('./cache/data') if __name__ == '__main__': from logs import Log from config import myargparse import matplotlib.pyplot as plt import matplotlib.colors as mcolors import pandas as pd args = myargparse(discription="场站端配置", add_help=False) opt = args.parse_args_and_yaml() log = Log().logger db = DataBase(begin='', end='', opt=opt, logger=log) db.data_process()