123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393 |
- import pandas as pd
- import datetime, time
- import pytz
- import os
- import pymysql
- from sqlalchemy import create_engine
- import pytz
- from cache.data_cleaning import cleaning, rm_duplicated
- current_path = os.path.dirname(__file__)
- dataloc = current_path + '/data/'
- def readData(name):
- """
- 读取数据
- :param name: 名字
- :return:
- """
- path = dataloc + r"/" + name
- return pd.read_csv(path)
- def saveData(name, data):
- """
- 存放数据
- :param name: 名字
- :param data: 数据
- :return:
- """
- path = dataloc + r"/" + name
- os.makedirs(os.path.dirname(path), exist_ok=True)
- data.to_csv(path, index=False)
- def timestamp_to_datetime(ts):
- local_timezone = pytz.timezone('Asia/Shanghai')
- if type(ts) is not int:
- raise ValueError("timestamp-时间格式必须是整型")
- if len(str(ts)) == 13:
- dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
- return dt
- elif len(str(ts)) == 10:
- dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
- return dt
- else:
- raise ValueError("timestamp-时间格式错误")
- def dt_tag(dt):
- date = dt.replace(hour=0, minute=0, second=0)
- delta = (dt - date) / pd.Timedelta(minutes=15)
- return delta + 1
- def timestr_to_timestamp(time_str):
- """
- 将时间戳或时间字符串转换为datetime.datetime类型
- :param time_data: int or str
- :return:datetime.datetime
- """
- if isinstance(time_str, str):
- if len(time_str) == 10:
- dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
- return int(round(time.mktime(dt.timetuple())) * 1000)
- elif len(time_str) in {17, 18, 19}:
- dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式
- return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳
- else:
- raise ValueError("时间字符串长度不满足要求!")
- else:
- return time_str
- class DataBase(object):
- def __init__(self, begin, end, opt, logger):
- self.begin = begin
- self.opt = opt
- self.his_begin = self.begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)
- self.end = end + pd.Timedelta(days=1) - pd.Timedelta(minutes=15)
- self.begin_stamp = timestr_to_timestamp(str(begin))
- self.his_begin_stamp = timestr_to_timestamp(str(self.his_begin))
- self.end_stamp = timestr_to_timestamp(str(self.end))
- self.database = opt.database
- self.logger = logger
- self.towerloc = self.opt.tower
- def clear_data(self):
- """
- 删除所有csv
- :return:
- """
- # 设置文件夹路径
- import glob
- import os
- folder_path = dataloc
- # 使用 glob 获取所有的 .csv 文件路径
- csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
- # 遍历所有 .csv 文件并删除
- for file_path in csv_files:
- os.remove(file_path)
- self.logger.info("清除所有csv文件")
- def create_database(self):
- """
- 创建数据库连接
- :param database: 数据库地址
- :return:
- """
- engine = create_engine(self.database)
- return engine
- def exec_sql(self, sql, engine):
- """
- 从数据库获取数据
- :param sql: sql语句
- :param engine: 数据库对象
- :return:
- """
- df = pd.read_sql_query(sql, engine)
- return df
- def get_process_NWP(self):
- """
- 从数据库中获取NWP数据,并进行简单处理
- :param database:
- :return:
- """
- # NPW数据
- engine = self.create_database()
- if self.opt.new_field:
- sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR, C_TPR," \
- "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, C_SOLAR_ZENITH," \
- "C_LCC, C_MCC, C_HCC, C_TCC, C_CLEARSKY_GHI, C_DNI_CALCD," \
- "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
- "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \
- " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 新NWP字段
- else:
- sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
- "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \
- "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
- "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \
- " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 老NWP字段
- NWP = self.exec_sql(sql_NWP, engine)
- NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
- NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
- # NWP['DT_TAG'] = NWP.apply(lambda x: dt_tag(x['C_TIME']), axis=1)
- NWP = cleaning(NWP, 'NWP', self.logger)
- # NWP = self.split_time(NWP)
- NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
- saveData("NWP.csv", NWP)
- self.logger.info("导出nwp数据")
- return NWP
- def get_process_tower(self):
- """
- 获取环境检测仪数据
- :param database:
- :return:
- """
- engine = self.create_database()
- self.logger.info("提取测风塔:{}".format(self.towerloc))
- for i in self.towerloc:
- # 删除没用的列
- drop_colmns = ["C_ID","C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
- get_colmns = []
- # 查询表的所有列名
- result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
- for name in result_set.iloc[:,0]:
- if name not in drop_colmns:
- get_colmns.append(name)
- all_columns_str = ", ".join([f'{col}' for col in get_colmns])
- tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.his_begin, self.end)
- tower = self.exec_sql(tower_sql, engine)
- tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
- saveData("/tower-{}.csv".format(i), tower)
- self.logger.info("测风塔{}导出数据".format(i))
- def get_process_power(self):
- """
- 获取整体功率数据
- :param database:
- :return:
- """
- engine = self.create_database()
- sql_cap = "select C_CAPACITY from t_electric_field"
- cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
- self.opt.cap = float(cap)
- sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_REFERENCE_POWER_BY_SAMPLE," \
- "C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL from t_power_station_status_data " \
- "where C_TIME between '{}' and '{}'".format(self.his_begin, self.end)
- powers = self.exec_sql(sql_power, engine)
- powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
- mask2 = powers[self.opt.predict] < 0
- mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
- mask = powers['C_REAL_VALUE'] == -99
- mask = mask | mask1 | mask2
- self.logger.info("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
- powers = powers[~mask]
- self.logger.info("剔除完后还剩{}条".format(len(powers)))
- binary_map = {b'\x00': 0, b'\x01': 1}
- powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
- powers = rm_duplicated(powers, self.logger)
- saveData("power.csv", powers)
- def get_process_dq(self):
- """
- 获取短期预测结果
- :param database:
- :return:
- """
- engine = self.create_database()
- sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
- "where C_FORECAST_TIME between {} and {}".format(self.his_begin_stamp, self.end_stamp)
- dq = self.exec_sql(sql_dq, engine)
- # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
- dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
- # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
- # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
- dq = cleaning(dq, 'dq', self.logger, cols=['C_FP_VALUE'])
- dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
- saveData("dq.csv", dq)
- def indep_process(self):
- """
- 进一步数据处理:时间统一处理等
- :return:
- """
- # 测风塔数据处理
- for i in self.towerloc:
- tower = readData("/tower-{}.csv".format(i))
- tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
- tower = cleaning(tower, 'tower', self.logger, [self.opt.usable_power['env']])
- tower_ave = tower.resample('15T', on='C_TIME').mean().reset_index()
- tower_ave = tower_ave.dropna(subset=[self.opt.usable_power['env']])
- tower_ave.set_index('C_TIME', inplace=True)
- tower_ave = tower_ave.interpolate(method='linear')
- tower_ave = tower_ave.fillna(method='ffill')
- tower_ave = tower_ave.fillna(method='bfill')
- tower_ave.reset_index(drop=False, inplace=True)
- tower_ave = tower_ave.dropna(subset=[self.opt.usable_power['env']]).round(2)
- tower_ave.iloc[:, 1:] = tower_ave.iloc[:, 1:].round(2)
- saveData("/tower-{}-process.csv".format(i), tower_ave)
- def get_process_cdq(self):
- """
- 获取超短期预测结果
- :param database:
- :return:
- """
- engine = self.create_database()
- sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \
- " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
- cdq = self.exec_sql(sql_cdq, engine)
- cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
- cdq = cleaning(cdq, 'cdq', self.logger, cols=['C_ABLE_VALUE'], dup=False)
- # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
- cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
- saveData("cdq.csv", cdq)
- def get_process_turbine(self):
- """
- 从数据库中获取风头数据,并进行简单处理
- :param database:
- :return:
- """
- for number in self.opt.turbineloc:
- # number = self.opt.usable_power['turbine_id']
- # 机头数据
- engine = self.create_database()
- self.logger.info("导出风机{}的数据".format(number))
- sql_turbine = "select C_TIME, C_WS, C_WD, C_ACTIVE_POWER from t_wind_turbine_status_data " \
- "WHERE C_EQUIPMENT_NO=" + str(number) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end) # + " and C_WS>0 and C_ACTIVE_POWER>0"
- turbine = self.exec_sql(sql_turbine, engine)
- turbine = cleaning(turbine, 'turbine-'+str(number), self.logger, cols=['C_WS', 'C_ACTIVE_POWER'], dup=False)
- turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
- # 直接导出所有数据
- saveData("turbine-{}.csv".format(number), turbine)
- def process_csv_files(self, input_dir, output_dir, M, N): # MBD:没有考虑时间重复
- if not os.path.exists(output_dir):
- os.makedirs(output_dir)
- for i in self.opt.turbineloc:
- input_file = os.path.join(input_dir, f"turbine-{i}.csv")
- output_file = os.path.join(output_dir, f"turbine-{i}.csv")
- # 读取csv文件
- df = pd.read_csv(input_file)
- # 剔除异常值,并获取异常值统计信息
- df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = self.remove_abnormal_values(df, N)
- # 输出异常值统计信息
- self.logger.info(f"处理文件:{input_file}")
- self.logger.info(f"剔除 -99 点异常值数量:{count_abnormal1}")
- self.logger.info(f"剔除连续异常值数量:{count_abnormal2}")
- self.logger.info(f"总共剔除数据量:{total_removed}")
- self.logger.info(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
- # 保存处理过的CSV文件
- df_clean.to_csv(output_file, index=False)
- def remove_abnormal_values(self,df, N):
- # 标记C_ACTIVE_POWER为-99的行为异常值
- abnormal_mask1 = df['C_ACTIVE_POWER'] == -99
- count_abnormal1 = abnormal_mask1.sum()
- # 标记C_WS, A, B连续5行不变的行为异常值
- columns = ['C_WS', 'C_WD', 'C_ACTIVE_POWER']
- abnormal_mask2 = self.mark_abnormal_streaks(df, columns, N)
- count_abnormal2 = abnormal_mask2.sum()
- # 获得所有异常值的布尔掩码
- abnormal_mask = abnormal_mask1 | abnormal_mask2
- # 获取连续异常值具体数值
- removed_continuous_values = {column: df.loc[abnormal_mask2, column].unique() for column in columns}
- # 剔除异常值
- df_clean = df[~abnormal_mask]
- total_removed = abnormal_mask.sum()
- return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
- # ——————————————————————————对分区的风机进行发电功率累加——————————————————————————————
- def zone_powers(self, input_dir):
- z_power = {}
- for zone, turbines in self.opt.zone.items():
- dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in self.opt.turbineloc if z in turbines]
- z_power['C_TIME'] = dfs[0]['C_TIME']
- sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1)
- z_power[zone] = sum_power
- z_power = pd.DataFrame(z_power)
- z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2)
- saveData("z-power.csv", z_power)
- # ——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
- def mark_abnormal_streaks(self, df, columns, min_streak):
- abnormal_mask = pd.Series(False, index=df.index)
- streak_start = None
- for i in range(len(df)):
- if i == 0 or any(df.at[i - 1, col] != df.at[i, col] for col in columns):
- streak_start = i
- if i - streak_start >= min_streak - 1:
- abnormal_mask[i - min_streak + 1:i + 1] = True
- return abnormal_mask
- # ——————————————————————————风机单机时间对齐——————————————————————————————
- def TimeMerge(self, input_dir, output_dir, M):
- # 读取所有CSV文件
- files = [os.path.join(input_dir, f"turbine-{i}.csv") for i in self.opt.turbineloc]
- dataframes = [pd.read_csv(f) for f in files]
- # 获取C_TIME列的交集
- c_time_intersection = set(dataframes[0]["C_TIME"])
- for df in dataframes[1:]:
- c_time_intersection.intersection_update(df["C_TIME"])
- # 只保留C_TIME交集中的数据
- filtered_dataframes = [df[df["C_TIME"].isin(c_time_intersection)] for df in dataframes]
- # 将每个过滤后的DataFrame写入新的CSV文件
- os.makedirs(output_dir, exist_ok=True)
- for (filtered_df, i) in zip(filtered_dataframes, self.opt.turbineloc):
- if i == 144:
- filtered_df['C_ACTIVE_POWER'] /= 1000
- filtered_df.to_csv(os.path.join(output_dir, f"turbine-{i}.csv"), index=False)
- def data_process(self):
- """
- 数据导出+初步处理的总操控代码
- :param database:
- :return:
- """
- self.clear_data()
- self.get_process_power()
- self.get_process_dq()
- self.get_process_cdq()
- self.get_process_NWP()
- self.get_process_tower()
- self.indep_process()
- self.get_process_turbine()
- self.process_csv_files('./cache/data', './cache/data', 50, 5)
- self.TimeMerge('./cache/data', './cache/data', 50)
- self.zone_powers('./cache/data')
|