|
@@ -2,13 +2,14 @@ import pandas as pd
|
|
|
import datetime, time
|
|
|
import re
|
|
|
import os
|
|
|
-import pymysql
|
|
|
+import argparse
|
|
|
from sqlalchemy import create_engine
|
|
|
import pytz
|
|
|
from data_cleaning import cleaning, rm_duplicated, key_field_row_cleaning
|
|
|
|
|
|
-current_path = os.path.dirname(__file__)
|
|
|
-dataloc = current_path + '/data/'
|
|
|
+# current_path = os.path.dirname(__file__)
|
|
|
+# dataloc = current_path + '/data/'
|
|
|
+station_id = '260'
|
|
|
|
|
|
def readData(name):
|
|
|
"""
|
|
@@ -16,7 +17,7 @@ def readData(name):
|
|
|
:param name: 名字
|
|
|
:return:
|
|
|
"""
|
|
|
- path = r"./cache/data/" + name
|
|
|
+ path = rf"../../cluster/{station_id}/" + name
|
|
|
return pd.read_csv(path)
|
|
|
|
|
|
|
|
@@ -27,7 +28,7 @@ def saveData(name, data):
|
|
|
:param data: 数据
|
|
|
:return:
|
|
|
"""
|
|
|
- path = r"./cache/data/" + name
|
|
|
+ path = rf"../../cluster/{station_id}/" + name
|
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
|
data.to_csv(path, index=False)
|
|
|
|
|
@@ -70,17 +71,14 @@ def timestr_to_timestamp(time_str):
|
|
|
|
|
|
|
|
|
class DataBase(object):
|
|
|
- def __init__(self, begin, end, opt, logger):
|
|
|
+ def __init__(self, begin, end, opt):
|
|
|
self.begin = begin
|
|
|
+ self.end = end
|
|
|
self.opt = opt
|
|
|
- # self.his_begin = self.begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)
|
|
|
- # self.end = end + pd.Timedelta(days=1) - pd.Timedelta(minutes=15)
|
|
|
- # self.begin_stamp = timestr_to_timestamp(str(begin))
|
|
|
- # self.his_begin_stamp = timestr_to_timestamp(str(self.his_begin))
|
|
|
- # self.end_stamp = timestr_to_timestamp(str(self.end))
|
|
|
+ self.begin_stamp = timestr_to_timestamp(str(begin))
|
|
|
+ self.end_stamp = timestr_to_timestamp(str(end))
|
|
|
self.database = opt.database
|
|
|
- self.logger = logger
|
|
|
- # self.towerloc = self.opt.tower
|
|
|
+ self.dataloc = opt.dataloc
|
|
|
|
|
|
def clear_data(self):
|
|
|
"""
|
|
@@ -90,7 +88,7 @@ class DataBase(object):
|
|
|
# 设置文件夹路径
|
|
|
import glob
|
|
|
import os
|
|
|
- folder_path = dataloc
|
|
|
+ folder_path = self.dataloc
|
|
|
|
|
|
# 使用 glob 获取所有的 .csv 文件路径
|
|
|
csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
|
|
@@ -98,7 +96,7 @@ class DataBase(object):
|
|
|
# 遍历所有 .csv 文件并删除
|
|
|
for file_path in csv_files:
|
|
|
os.remove(file_path)
|
|
|
- self.logger.info("清除所有csv文件")
|
|
|
+ print("清除所有csv文件")
|
|
|
|
|
|
def create_database(self):
|
|
|
"""
|
|
@@ -119,181 +117,51 @@ class DataBase(object):
|
|
|
df = pd.read_sql_query(sql, engine)
|
|
|
return df
|
|
|
|
|
|
- def get_process_NWP(self):
|
|
|
- """
|
|
|
- 从数据库中获取NWP数据,并进行简单处理
|
|
|
- :param database:
|
|
|
- :return:
|
|
|
- """
|
|
|
- # NPW数据
|
|
|
- engine = self.create_database()
|
|
|
- sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
|
|
|
- "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \
|
|
|
- "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
|
|
|
- "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" \
|
|
|
- " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 风的NWP字段
|
|
|
- NWP = self.exec_sql(sql_NWP, engine)
|
|
|
-
|
|
|
- NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
|
|
|
-
|
|
|
- NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
|
|
|
- # NWP['DT_TAG'] = NWP.apply(lambda x: dt_tag(x['C_TIME']), axis=1)
|
|
|
- NWP = cleaning(NWP, 'NWP')
|
|
|
- # NWP = self.split_time(NWP)
|
|
|
- NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- saveData("NWP.csv", NWP)
|
|
|
- self.logger.info("导出nwp数据")
|
|
|
- return NWP
|
|
|
-
|
|
|
- def get_process_tower(self):
|
|
|
- """
|
|
|
- 获取环境检测仪数据
|
|
|
- :param database:
|
|
|
- :return:
|
|
|
- """
|
|
|
- engine = self.create_database()
|
|
|
- self.logger.info("提取测风塔:{}".format(self.opt.towerloc))
|
|
|
- for i in [self.opt.towerloc]:
|
|
|
- # 删除没用的列
|
|
|
- drop_colmns = ["C_ID","C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10","C_IS_GENERATED","C_ABNORMAL_CODE"]
|
|
|
-
|
|
|
- get_colmns = []
|
|
|
- # # 查询表的所有列名
|
|
|
- result_set = self.exec_sql("SHOW COLUMNS FROM t_wind_tower_status_data", engine)
|
|
|
-
|
|
|
- tower = pd.read_csv("./cache/data/t_wind_tower_status_data.csv")
|
|
|
- tower.columns = list(result_set['Field'].values)
|
|
|
-
|
|
|
- for name in result_set.iloc[:,0]:
|
|
|
- if name not in drop_colmns:
|
|
|
- get_colmns.append(name)
|
|
|
-
|
|
|
- all_columns_str = ", ".join([f'{col}' for col in get_colmns])
|
|
|
- # tower_sql = "select " + all_columns_str + " from t_wind_tower_status_data where C_EQUIPMENT_NO="+str(i) + " and C_TIME between '{}' and '{}'".format(self.his_begin, self.end)
|
|
|
- # tower = self.exec_sql(tower_sql, engine)
|
|
|
- tower = tower[all_columns_str.split(', ')]
|
|
|
- # tower.drop(columns=drop_colmns, inplace=True)
|
|
|
- tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
|
|
|
- saveData("/tower-{}.csv".format(i), tower)
|
|
|
- self.logger.info("测风塔{}导出数据".format(i))
|
|
|
-
|
|
|
def get_process_power(self):
|
|
|
"""
|
|
|
获取整体功率数据
|
|
|
:param database:
|
|
|
:return:
|
|
|
"""
|
|
|
- powers = pd.read_csv('./cache/data/t_power_station_status_data.csv')
|
|
|
- shouzu = pd.read_csv('./cache/data/全场_原始数据_2024-11-23_16-22-42.csv')
|
|
|
- shouzu.rename(columns={'时间':'C_TIME', '场外受阻发电量': 'SHOUZU'}, inplace=True)
|
|
|
- shouzu['C_TIME'] = pd.to_datetime(shouzu['C_TIME'])
|
|
|
- shouzu = shouzu[~(shouzu['SHOUZU'] > 0)]
|
|
|
-
|
|
|
-
|
|
|
engine = self.create_database()
|
|
|
sql_cap = "select C_CAPACITY from t_electric_field"
|
|
|
cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
|
|
|
- self.opt.cap = float(cap)
|
|
|
-
|
|
|
- result_set = self.exec_sql("SHOW COLUMNS FROM t_power_station_status_data", engine)
|
|
|
- powers.columns = list(result_set.iloc[:, 0].values)
|
|
|
- powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
|
|
|
- powers = pd.merge(powers, shouzu, on='C_TIME')
|
|
|
- powers = powers[['C_TIME', 'C_REAL_VALUE', 'C_ABLE_VALUE', 'C_IS_RATIONING_BY_MANUAL_CONTROL', 'C_IS_RATIONING_BY_AUTO_CONTROL']]
|
|
|
-
|
|
|
- # if self.opt.usable_power["clean_power_by_signal"]:
|
|
|
- # sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and C_IS_RATIONING_BY_AUTO_CONTROL=0"
|
|
|
+ sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
|
|
|
+ " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
|
|
|
+ powers = self.exec_sql(sql_power, engine)
|
|
|
powers['C_TIME'] = pd.to_datetime(powers['C_TIME'])
|
|
|
- mask2 = powers['C_REAL_VALUE'] < 0
|
|
|
mask1 = powers['C_REAL_VALUE'].astype(float) > float(cap)
|
|
|
mask = powers['C_REAL_VALUE'] == -99
|
|
|
|
|
|
- mask = mask | mask1 | mask2
|
|
|
- self.logger.info("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
|
|
|
+ mask = mask | mask1
|
|
|
+ print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
|
|
|
powers = powers[~mask]
|
|
|
- self.logger.info("剔除完后还剩{}条".format(len(powers)))
|
|
|
- # binary_map = {b'\x00': 0, b'\x01': 1}
|
|
|
- # powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
|
|
|
+ print("剔除完后还剩{}条".format(len(powers)))
|
|
|
+ binary_map = {b'\x00': 0, b'\x01': 1}
|
|
|
+ powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
|
|
|
powers = rm_duplicated(powers)
|
|
|
- saveData("power_filter4.csv", powers)
|
|
|
+ saveData("power.csv", powers)
|
|
|
|
|
|
- def get_process_dq(self):
|
|
|
- """
|
|
|
- 获取短期预测结果
|
|
|
- :param database:
|
|
|
- :return:
|
|
|
- """
|
|
|
- engine = self.create_database()
|
|
|
- sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
|
|
|
- "where C_FORECAST_TIME between {} and {}".format(self.his_begin_stamp, self.end_stamp)
|
|
|
- dq = self.exec_sql(sql_dq, engine)
|
|
|
- # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
|
|
|
- dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
|
|
|
- # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
|
|
|
- # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
|
|
|
- dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
|
|
|
- dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- saveData("dq.csv", dq)
|
|
|
-
|
|
|
- def indep_process(self):
|
|
|
- """
|
|
|
- 进一步数据处理:时间统一处理等
|
|
|
- :return:
|
|
|
- """
|
|
|
- # 测风塔数据处理
|
|
|
- for i in [self.opt.towerloc]:
|
|
|
- tower = readData("/tower-{}.csv".format(i))
|
|
|
- tower = cleaning(tower, 'tower', [self.opt.usable_power["env"]])
|
|
|
-
|
|
|
- tower['C_TIME'] = pd.to_datetime(tower['C_TIME'])
|
|
|
- tower_ave = tower.resample('15T', on='C_TIME').mean().reset_index()
|
|
|
- tower_ave = tower_ave.dropna(subset=[self.opt.usable_power['env']])
|
|
|
- tower_ave.iloc[:, 1:] = tower_ave.iloc[:, 1:].round(2)
|
|
|
- saveData("/tower-{}-process.csv".format(i), tower_ave)
|
|
|
-
|
|
|
- def get_process_cdq(self):
|
|
|
- """
|
|
|
- 获取超短期预测结果
|
|
|
- :param database:
|
|
|
- :return:
|
|
|
- """
|
|
|
- engine = self.create_database()
|
|
|
- sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from t_forecast_power_ultra_short_term_his" \
|
|
|
- " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
|
|
|
- cdq = self.exec_sql(sql_cdq, engine)
|
|
|
- cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
|
|
|
- cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
|
|
|
- # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
|
|
|
- cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
- saveData("cdq.csv", cdq)
|
|
|
-
|
|
|
- def get_process_turbine(self):
|
|
|
+ def get_process_turbine(self, output_dir):
|
|
|
"""
|
|
|
从数据库中获取风头数据,并进行简单处理
|
|
|
:param database:
|
|
|
:return:
|
|
|
"""
|
|
|
- cids = [x for x in range(33, 65, 1)]
|
|
|
- c_names = ['F01', 'F02', 'F03', 'F04', 'F05', 'F06', 'F07', 'F08', 'F09', 'F10']+['F'+str(x) for x in range(10, 32)]
|
|
|
- id_names = {id: c_names[x] for x, id in enumerate(cids)}
|
|
|
- turbines = pd.read_csv('./cache/data/turbines.csv')
|
|
|
- turbines_ori = pd.read_csv('./cache/data/全场_原始数据_2024-08-01_19-40-25.csv')
|
|
|
- turbines_ori['风机'] = turbines_ori['风机'].apply(lambda x: re.sub(r'\(.*?\)|\[.*?\]|\{.*?\}', '', x))
|
|
|
- turbines_ori.rename(columns={"时间": "C_TIME"}, inplace=True)
|
|
|
- turbines_ori = turbines_ori[['C_TIME', '风机', '低位首触机组运行状态', '高位首触机组运行状态']]
|
|
|
- turbines_ori['C_TIME'] = pd.to_datetime(turbines_ori['C_TIME'])
|
|
|
- turbines['C_TIME'] = pd.to_datetime(turbines['C_TIME'])
|
|
|
for number in self.opt.turbineloc:
|
|
|
- # number = self.opt.usable_power['turbine_id']
|
|
|
# 机头数据
|
|
|
- turbine = turbines[turbines['C_EQUIPMENT_NO'] == number]
|
|
|
- turbine_ori = turbines_ori[(turbines_ori['风机'] == id_names[number]) & (turbines_ori['低位首触机组运行状态'] <=3) & (turbines_ori['高位首触机组运行状态'] <=3)]
|
|
|
- turbine = pd.merge(turbine, turbine_ori.loc[:, ["C_TIME", "风机"]], on="C_TIME")
|
|
|
- turbine.drop(columns=['风机'], inplace=True)
|
|
|
- turbine = key_field_row_cleaning(turbine, cols=['C_WS', 'C_ACTIVE_POWER'])
|
|
|
+ engine = self.create_database()
|
|
|
+
|
|
|
+ print("导出风机{}的数据".format(number))
|
|
|
+ sql_turbine = "select C_TIME, C_WS, C_WD, C_ACTIVE_POWER from t_wind_turbine_status_data " \
|
|
|
+ "WHERE C_EQUIPMENT_NO=" + str(number) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end) # + " and C_WS>0 and C_ACTIVE_POWER>0"
|
|
|
+ turbine = self.exec_sql(sql_turbine, engine)
|
|
|
+ turbine = cleaning(turbine, 'turbine', cols=['C_WS', 'C_ACTIVE_POWER'], dup=False)
|
|
|
+ turbine['C_TIME'] = pd.to_datetime(turbine['C_TIME'])
|
|
|
turbine = turbine[turbine['C_TIME'].dt.strftime('%M').isin(['00', '15', '30', '45'])]
|
|
|
# 直接导出所有数据
|
|
|
- saveData("turbine-{}.csv".format(number), turbine)
|
|
|
+ output_file = os.path.join(output_dir, f"turbine-{number}.csv")
|
|
|
+ saveData(output_file, turbine)
|
|
|
|
|
|
def process_csv_files(self, input_dir, output_dir, M, N): # MBD:没有考虑时间重复
|
|
|
if not os.path.exists(output_dir):
|
|
@@ -310,11 +178,11 @@ class DataBase(object):
|
|
|
df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values = self.remove_abnormal_values(df, N)
|
|
|
|
|
|
# 输出异常值统计信息
|
|
|
- self.logger.info(f"处理文件:{input_file}")
|
|
|
- self.logger.info(f"剔除 -99 点异常值数量:{count_abnormal1}")
|
|
|
- self.logger.info(f"剔除连续异常值数量:{count_abnormal2}")
|
|
|
- self.logger.info(f"总共剔除数据量:{total_removed}")
|
|
|
- self.logger.info(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
|
|
|
+ print(f"处理文件:{input_file}")
|
|
|
+ print(f"剔除 -99 点异常值数量:{count_abnormal1}")
|
|
|
+ print(f"剔除连续异常值数量:{count_abnormal2}")
|
|
|
+ print(f"总共剔除数据量:{total_removed}")
|
|
|
+ print(f"剔除的连续异常值具体数值:{removed_continuous_values}\n")
|
|
|
|
|
|
# 保存处理过的CSV文件
|
|
|
df_clean.to_csv(output_file, index=False)
|
|
@@ -337,20 +205,6 @@ class DataBase(object):
|
|
|
total_removed = abnormal_mask.sum()
|
|
|
return df_clean, count_abnormal1, count_abnormal2, total_removed, removed_continuous_values
|
|
|
|
|
|
- # ——————————————————————————对分区的风机进行发电功率累加——————————————————————————————
|
|
|
- def zone_powers(self, input_dir):
|
|
|
- z_power = {}
|
|
|
- for zone, turbines in self.opt.zone.items():
|
|
|
- dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in self.opt.turbineloc if z in turbines]
|
|
|
- z_power['C_TIME'] = dfs[0]['C_TIME']
|
|
|
- sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1)
|
|
|
- z_power[zone] = sum_power
|
|
|
- z_power = pd.DataFrame(z_power)
|
|
|
- z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2)
|
|
|
- saveData("z-power-t.csv", z_power)
|
|
|
-
|
|
|
-
|
|
|
-
|
|
|
# ——————————————————————————机头风速-99和连续异常值清洗代码——————————————————————————————
|
|
|
def mark_abnormal_streaks(self, df, columns, min_streak):
|
|
|
abnormal_mask = pd.Series(False, index=df.index)
|
|
@@ -391,27 +245,23 @@ class DataBase(object):
|
|
|
:param database:
|
|
|
:return:
|
|
|
"""
|
|
|
- # self.clear_data()
|
|
|
+ self.clear_data()
|
|
|
self.get_process_power()
|
|
|
- # self.get_process_dq()
|
|
|
- # self.get_process_cdq()
|
|
|
- # self.get_process_NWP()
|
|
|
- # self.get_process_tower()
|
|
|
- # self.indep_process()
|
|
|
- self.get_process_turbine()
|
|
|
- self.process_csv_files('./cache/data', './cache/data', 50, 5)
|
|
|
- self.TimeMerge('./cache/data', './cache/data', 50)
|
|
|
- self.zone_powers('./cache/data')
|
|
|
+ self.get_process_turbine(f'../../cluster/{station_id}')
|
|
|
+ self.process_csv_files(f'../../cluster/{station_id}', f'../../cluster/{station_id}', 50, 5)
|
|
|
+ self.TimeMerge(f'../../cluster/{station_id}', f'../../cluster/{station_id}', 50)
|
|
|
+ # self.zone_powers('../cluster/data')
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
- from logs import Log
|
|
|
- from config import myargparse
|
|
|
- import matplotlib.pyplot as plt
|
|
|
- import matplotlib.colors as mcolors
|
|
|
import pandas as pd
|
|
|
-
|
|
|
- args = myargparse(discription="场站端配置", add_help=False)
|
|
|
- opt = args.parse_args_and_yaml()
|
|
|
- log = Log().logger
|
|
|
- db = DataBase(begin='', end='', opt=opt, logger=log)
|
|
|
+ turbineloc = [x for x in range(76, 151, 1)]
|
|
|
+ c_names = ['G01', 'G02', 'G03', 'G04', 'G05', 'G06', 'G07', 'G08', 'G09', 'G10'] + ['G' + str(x) for x in range(11, 76)]
|
|
|
+ id_names = {id: c_names[x] for x, id in enumerate(turbineloc)}
|
|
|
+ args = {'database': 'mysql+pymysql://root:mysql_T7yN3E@192.168.12.10:19306/ipfcst_j00260_20250507161106',
|
|
|
+ 'cap': 225,
|
|
|
+ 'id_name': id_names,
|
|
|
+ 'turbineloc': turbineloc,
|
|
|
+ 'dataloc': '../cluster/data'}
|
|
|
+ opt = argparse.Namespace(**args)
|
|
|
+ db = DataBase(begin='2025-01-01', end='2025-05-01', opt=opt)
|
|
|
db.data_process()
|