|
@@ -1,388 +1,267 @@
|
|
-import pymysql
|
|
|
|
import pandas as pd
|
|
import pandas as pd
|
|
-import numpy as np
|
|
|
|
|
|
+import datetime, time
|
|
|
|
+import yaml
|
|
|
|
+import os
|
|
|
|
+import pymysql
|
|
from sqlalchemy import create_engine
|
|
from sqlalchemy import create_engine
|
|
-import matplotlib.pyplot as plt
|
|
|
|
import pytz
|
|
import pytz
|
|
-plt.rcParams['font.sans-serif'] = ['SimHei']
|
|
|
|
-import utils.savedata
|
|
|
|
-from utils import Arg
|
|
|
|
-from norm import Normalize
|
|
|
|
-arg = Arg.Arg()
|
|
|
|
-norm = Normalize()
|
|
|
|
-def clear_data():
|
|
|
|
- """
|
|
|
|
- 删除所有csv
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- # 设置文件夹路径
|
|
|
|
- import glob
|
|
|
|
- import os
|
|
|
|
- folder_path = arg.dataloc
|
|
|
|
-
|
|
|
|
- # 使用 glob 获取所有的 .csv 文件路径
|
|
|
|
- csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
|
|
|
|
-
|
|
|
|
- # 遍历所有 .csv 文件并删除
|
|
|
|
- for file_path in csv_files:
|
|
|
|
- os.remove(file_path)
|
|
|
|
- print("清除所有scv文件")
|
|
|
|
-
|
|
|
|
-def create_database(database):
|
|
|
|
- """
|
|
|
|
- 创建数据库连接
|
|
|
|
- :param database: 数据库地址
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- engine = create_engine(database)
|
|
|
|
- return engine
|
|
|
|
-
|
|
|
|
-def exec_sql(sql,engine):
|
|
|
|
- """
|
|
|
|
- 从数据库获取数据
|
|
|
|
- :param sql: sql语句
|
|
|
|
- :param engine: 数据库对象
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- df = pd.read_sql_query(sql, engine)
|
|
|
|
- return df
|
|
|
|
-
|
|
|
|
-def get_process_NWP(database):
|
|
|
|
- """
|
|
|
|
- 从数据库中获取NWP数据,并进行简单处理
|
|
|
|
- :param database:
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- # NPW数据
|
|
|
|
- engine = create_database(database)
|
|
|
|
- sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR, C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp" # 光的NWP字段
|
|
|
|
- NWP = exec_sql(sql_NWP, engine)
|
|
|
|
-
|
|
|
|
- #删除后三位
|
|
|
|
- NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(str)
|
|
|
|
- NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].str[:-3]
|
|
|
|
-
|
|
|
|
- # 将 'timestamp' 列转换为日期时间格式
|
|
|
|
- NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].astype(float)
|
|
|
|
- NWP['C_PRE_TIME'] = pd.to_datetime(NWP['C_PRE_TIME'], unit='s')
|
|
|
|
-
|
|
|
|
- # 将日期时间转换为本地时区
|
|
|
|
- NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.tz_localize(pytz.utc).dt.tz_convert('Asia/Shanghai')
|
|
|
|
-
|
|
|
|
- # 格式化日期时间为年月日时分秒
|
|
|
|
- NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
-
|
|
|
|
- NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
|
|
|
|
-
|
|
|
|
- utils.savedata.saveData("NWP.csv",NWP)
|
|
|
|
- norm.normalize(NWP)
|
|
|
|
- return NWP
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def get_process_weather(database):
|
|
|
|
- """
|
|
|
|
- 获取环境检测仪数据
|
|
|
|
- :param database:
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- engine = create_database(database)
|
|
|
|
- print("现有环境监测仪:{}".format(arg.weatherloc))
|
|
|
|
- for i in arg.weatherloc:
|
|
|
|
- print("环境监测仪{}导出数据".format(i))
|
|
|
|
- # 删除没用的列
|
|
|
|
- drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"]
|
|
|
|
|
|
+from getdata.data_cleaning import cleaning, rm_duplicated
|
|
|
|
+current_path = os.path.dirname(__file__)
|
|
|
|
+dataloc = current_path + '/data/'
|
|
|
|
+weatherloc = [1]
|
|
|
|
|
|
- get_colmns = []
|
|
|
|
- # 查询表的所有列名
|
|
|
|
- result_set = exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine)
|
|
|
|
- for name in result_set.iloc[:,0]:
|
|
|
|
- if name not in drop_colmns:
|
|
|
|
- get_colmns.append(name)
|
|
|
|
|
|
|
|
- all_columns_str = ", ".join([f'{col}' for col in get_colmns])
|
|
|
|
-
|
|
|
|
- weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+str(i)
|
|
|
|
- weather = exec_sql(weather_sql, engine)
|
|
|
|
- utils.savedata.saveData("weather/weather-{}.csv".format(i), weather)
|
|
|
|
- norm.normalize(weather)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def get_process_power(database):
|
|
|
|
- """
|
|
|
|
- 获取整体功率数据
|
|
|
|
- :param database:
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- engine = create_database(database)
|
|
|
|
- sql_cap = "select C_CAPACITY from t_electric_field"
|
|
|
|
- cap = exec_sql(sql_cap, engine)['C_CAPACITY']
|
|
|
|
- sql_power = "select C_TIME,C_REAL_VALUE from t_power_station_status_data"
|
|
|
|
- powers = exec_sql(sql_power, engine)
|
|
|
|
- mask1 = powers['C_REAL_VALUE'] > float(cap)
|
|
|
|
- mask = powers['C_REAL_VALUE'] == -99
|
|
|
|
-
|
|
|
|
- mask = mask | mask1
|
|
|
|
- print("要剔除功率有{}条".format(mask.sum()))
|
|
|
|
- powers = powers[~mask]
|
|
|
|
-
|
|
|
|
- utils.savedata.saveData("power.csv", powers)
|
|
|
|
- power5, power_index = [], [0] # 功率表,索引表
|
|
|
|
- ps = 0
|
|
|
|
- # 获取5分钟一个间隔的功率数据
|
|
|
|
- for i, power in powers.iterrows():
|
|
|
|
- real_value = power['C_REAL_VALUE']
|
|
|
|
- ps += real_value
|
|
|
|
- if str(power['C_TIME'].minute)[-1] in ('0', '5'):
|
|
|
|
- power_index.append(i)
|
|
|
|
- num = power_index[-1] - power_index[-2]
|
|
|
|
- num = num if num != 0 else 1
|
|
|
|
- psa = round(ps / num, 2)
|
|
|
|
- power5.append([power['C_TIME'], psa])
|
|
|
|
- ps = 0
|
|
|
|
- power5 = pd.DataFrame(power5, columns=['C_TIME', 'C_REAL_VALUE'])
|
|
|
|
- utils.savedata.saveData("power5.csv", power5)
|
|
|
|
- norm.normalize(power5)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def get_process_dq(database):
|
|
|
|
|
|
+def readData(name):
|
|
"""
|
|
"""
|
|
- 获取短期预测结果
|
|
|
|
- :param database:
|
|
|
|
|
|
+ 读取数据
|
|
|
|
+ :param name: 名字
|
|
:return:
|
|
:return:
|
|
"""
|
|
"""
|
|
- engine = create_database(database)
|
|
|
|
- sql_dq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_short_term_his"
|
|
|
|
- dq = exec_sql(sql_dq, engine)
|
|
|
|
- dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
|
|
|
|
- utils.savedata.saveData("dq.csv", dq)
|
|
|
|
-
|
|
|
|
-def get_process_cdq(database):
|
|
|
|
- """
|
|
|
|
- 获取超短期预测结果
|
|
|
|
- :param database:
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- engine = create_database(database)
|
|
|
|
- sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE from t_forecast_power_ultra_short_term_his"
|
|
|
|
- cdq = exec_sql(sql_cdq, engine)
|
|
|
|
- cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'], unit='ms')
|
|
|
|
- utils.savedata.saveData("cdq.csv", cdq)
|
|
|
|
|
|
+ path = dataloc + r"/" + name
|
|
|
|
+ return pd.read_csv(path)
|
|
|
|
|
|
|
|
|
|
-def indep_process():
|
|
|
|
|
|
+def saveData(name, data):
|
|
"""
|
|
"""
|
|
- 进一步数据处理:时间统一处理等
|
|
|
|
|
|
+ 存放数据
|
|
|
|
+ :param name: 名字
|
|
|
|
+ :param data: 数据
|
|
:return:
|
|
:return:
|
|
"""
|
|
"""
|
|
- # 环境监测仪数据处理
|
|
|
|
- for i in arg.weatherloc:
|
|
|
|
- weather = utils.savedata.readData("/weather/weather-{}.csv".format(i))
|
|
|
|
- # 判断每一列是否全是 -99
|
|
|
|
- all_minus_99 = (weather == -99).all()
|
|
|
|
-
|
|
|
|
- # 获取全是 -99 的列的列名
|
|
|
|
- cols_to_drop = all_minus_99[all_minus_99 == True].index.tolist()
|
|
|
|
-
|
|
|
|
- # 使用 drop() 方法删除列
|
|
|
|
- weather = weather.drop(cols_to_drop, axis=1)
|
|
|
|
-
|
|
|
|
- # MBD: 将一部分是-99的列删除,把-99替换为nan
|
|
|
|
- weather_nan = weather.replace(-99, np.nan, inplace=False)
|
|
|
|
- # nan 超过80% 删除
|
|
|
|
- weather = weather.dropna(axis=1, thresh=len(weather_nan) * 0.8)
|
|
|
|
- weather = weather.replace(np.nan, -99, inplace=False)
|
|
|
|
- # 删除取值全部相同的列
|
|
|
|
- weather = weather.loc[:, (weather != weather.iloc[0]).any()]
|
|
|
|
- utils.savedata.saveData("/weather/weather-{}-process.csv".format(i), weather)
|
|
|
|
-
|
|
|
|
- # 时间统一
|
|
|
|
- weather1 = utils.savedata.readData("/weather/weather-{}-process.csv".format(1))
|
|
|
|
- # tower2 = utils.savedata.readData("/tower/tower-{}-process.csv".format(2))
|
|
|
|
- # tower1 = tower1[tower1['C_TIME'].isin(tower2['C_TIME'])]
|
|
|
|
- # tower2 = tower2[tower2['C_TIME'].isin(tower1['C_TIME'])]
|
|
|
|
-
|
|
|
|
- utils.savedata.saveData("/weather/weather-{}-process.csv".format(1), weather1)
|
|
|
|
- # utils.savedata.saveData("/tower/tower-{}-process.csv".format(2), tower2)
|
|
|
|
-
|
|
|
|
- # 所有表时间统一
|
|
|
|
- filenames = ["/NWP.csv","/power.csv", "power5.csv", "/dq.csv", "/cdq.csv", '/weather/weather-1-process.csv']
|
|
|
|
- dataframes = []
|
|
|
|
-
|
|
|
|
- for name in filenames:
|
|
|
|
- dataframes.append(utils.savedata.readData(name))
|
|
|
|
-
|
|
|
|
- # 查找最大起始时间和最小结束时间
|
|
|
|
- max_start_time = max(df['C_TIME'].min() for df in dataframes)
|
|
|
|
- min_end_time = min(df['C_TIME'].max() for df in dataframes)
|
|
|
|
-
|
|
|
|
- print(max_start_time)
|
|
|
|
- print(min_end_time)
|
|
|
|
-
|
|
|
|
- # 重新调整每个 DataFrame 的时间范围,只保留在 [max_start_time, min_end_time] 区间内的数据
|
|
|
|
- for i, df in enumerate(dataframes):
|
|
|
|
- df['C_TIME'] = pd.to_datetime(df['C_TIME']) # 确保时间列是 datetime 类型
|
|
|
|
- df_filtered = df[(df['C_TIME'] >= max_start_time) & (df['C_TIME'] <= min_end_time)]
|
|
|
|
-
|
|
|
|
- # 将结果保存到新文件,文件名为原文件名加上 "_filtered" 后缀
|
|
|
|
- utils.savedata.saveData(filenames[i],df_filtered)
|
|
|
|
-
|
|
|
|
-def NWP_indep_process():
|
|
|
|
|
|
+ path = dataloc + r"/" + name
|
|
|
|
+ os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
|
|
+ data.to_csv(path, index=False)
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def timestamp_to_datetime(ts):
|
|
|
|
+ local_timezone = pytz.timezone('Asia/Shanghai')
|
|
|
|
+ if type(ts) is not int:
|
|
|
|
+ raise ValueError("timestamp-时间格式必须是整型")
|
|
|
|
+ if len(str(ts)) == 13:
|
|
|
|
+ dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone)
|
|
|
|
+ return dt
|
|
|
|
+ elif len(str(ts)) == 10:
|
|
|
|
+ dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone)
|
|
|
|
+ return dt
|
|
|
|
+ else:
|
|
|
|
+ raise ValueError("timestamp-时间格式错误")
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+def timestr_to_timestamp(time_str):
|
|
"""
|
|
"""
|
|
- 将NWP数据按照缺失值数量划分为N个不同数据集
|
|
|
|
- :return:
|
|
|
|
|
|
+ 将时间戳或时间字符串转换为datetime.datetime类型
|
|
|
|
+ :param time_data: int or str
|
|
|
|
+ :return:datetime.datetime
|
|
"""
|
|
"""
|
|
- # NWP数据进一步处理
|
|
|
|
- NWP = utils.savedata.readData("NWP.csv")
|
|
|
|
- df = pd.to_datetime(NWP['C_TIME'])
|
|
|
|
- time_diff = df.diff()
|
|
|
|
- time_diff_threshold = pd.Timedelta(minutes=15)
|
|
|
|
- missing_values = df[time_diff > time_diff_threshold]
|
|
|
|
- print("NWP数据缺失的数量为:{}".format(len(missing_values)))
|
|
|
|
- print(missing_values)
|
|
|
|
-
|
|
|
|
- # 文件保存
|
|
|
|
- utils.savedata.saveVar("NWP_miss.pickle", missing_values)
|
|
|
|
- split_indices = []
|
|
|
|
- for i in range(len(missing_values)):
|
|
|
|
- if i == 0:
|
|
|
|
- split_indices.append((0, missing_values.index[i]))
|
|
|
|
|
|
+ if isinstance(time_str, str):
|
|
|
|
+ if len(time_str) == 10:
|
|
|
|
+ dt = datetime.datetime.strptime(time_str, '%Y-%m-%d')
|
|
|
|
+ return int(round(time.mktime(dt.timetuple())) * 1000)
|
|
|
|
+ elif len(time_str) in {17, 18, 19}:
|
|
|
|
+ dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式
|
|
|
|
+ return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳
|
|
else:
|
|
else:
|
|
- split_indices.append((missing_values.index[i - 1], missing_values.index[i]))
|
|
|
|
- split_indices.append((missing_values.index[-1], len(df))) # MBD: 分割少了一个点
|
|
|
|
- split_datasets = [NWP.iloc[start:end,:] for start, end in split_indices]
|
|
|
|
- for i, split_df in enumerate(split_datasets):
|
|
|
|
- utils.savedata.saveData("Dataset_training/NWP/NWP_{}.csv".format(i),split_df)
|
|
|
|
-
|
|
|
|
- return split_datasets
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-# def power_indep_process():
|
|
|
|
-# NWP = utils.savedata.readData("power.csv")
|
|
|
|
-
|
|
|
|
-def Data_split():
|
|
|
|
- """
|
|
|
|
- 这个函数没用上,可以不看
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- NWP = utils.savedata.readData("power_15min.csv")
|
|
|
|
- df = pd.to_datetime(NWP['C_TIME'])
|
|
|
|
- time_diff = df.diff()
|
|
|
|
- time_diff_threshold = pd.Timedelta(minutes=15)
|
|
|
|
- missing_values = df[time_diff > time_diff_threshold]
|
|
|
|
- print("NWP数据缺失的数量为:{}".format(len(missing_values)))
|
|
|
|
- print(missing_values)
|
|
|
|
- NWP_miss = utils.savedata.readVar("NWP_miss.pickle")
|
|
|
|
-
|
|
|
|
- for t in missing_values.index:
|
|
|
|
- a = t-1
|
|
|
|
- b = t
|
|
|
|
- time1 = NWP['C_TIME'][a]
|
|
|
|
- time2 = NWP['C_TIME'][b]
|
|
|
|
- df = pd.to_datetime([time1, time2])
|
|
|
|
- # 计算时间差
|
|
|
|
- time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
|
|
|
|
- print(time_diff)
|
|
|
|
-
|
|
|
|
- time1 = "2022-10-27 14:00:00"
|
|
|
|
- time2 = "2023-04-16 12:00:00"
|
|
|
|
- df = pd.to_datetime([time1, time2])
|
|
|
|
- # 计算时间差
|
|
|
|
- time_diff = (df[1] - df[0]) / pd.Timedelta(minutes=15)
|
|
|
|
- print(time_diff)
|
|
|
|
-
|
|
|
|
-def time_all_in():
|
|
|
|
- """
|
|
|
|
- 这个函数暂时没用上,这个函数的目的是给机头数据进行填充,找到时间缺失的位置,填充为-99
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- filenames = []
|
|
|
|
- dataframes = []
|
|
|
|
- for i in arg.turbineloc:
|
|
|
|
- filenames.append("/turbine-15/turbine-{}.csv".format(i))
|
|
|
|
- for name in filenames:
|
|
|
|
- dataframes.append(utils.savedata.readData(name))
|
|
|
|
-
|
|
|
|
- for df in dataframes:
|
|
|
|
- df['C_TIME'] = pd.to_datetime(df['C_TIME'])
|
|
|
|
-
|
|
|
|
- # 创建一个完整的时间序列索引,包括所有可能的时间点
|
|
|
|
- start_time = df['C_TIME'].min()
|
|
|
|
- end_time = df['C_TIME'].max()
|
|
|
|
- full_time_range = pd.date_range(start_time, end_time, freq='15min')
|
|
|
|
|
|
+ raise ValueError("时间字符串长度不满足要求!")
|
|
|
|
+ else:
|
|
|
|
+ return time_str
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+class DataBase(object):
|
|
|
|
+ def __init__(self, begin, end, database):
|
|
|
|
+ self.begin = begin
|
|
|
|
+ self.end = end - pd.Timedelta(minutes=15)
|
|
|
|
+ self.begin_stamp = timestr_to_timestamp(str(begin))
|
|
|
|
+ self.end_stamp = timestr_to_timestamp(str(self.end))
|
|
|
|
+ self.database = database
|
|
|
|
+
|
|
|
|
+ def clear_data(self):
|
|
|
|
+ """
|
|
|
|
+ 删除所有csv
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ # 设置文件夹路径
|
|
|
|
+ import glob
|
|
|
|
+ import os
|
|
|
|
+ folder_path = dataloc
|
|
|
|
+
|
|
|
|
+ # 使用 glob 获取所有的 .csv 文件路径
|
|
|
|
+ csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True)
|
|
|
|
+
|
|
|
|
+ # 遍历所有 .csv 文件并删除
|
|
|
|
+ for file_path in csv_files:
|
|
|
|
+ os.remove(file_path)
|
|
|
|
+ # self.logger.info("清除所有csv文件")
|
|
|
|
+
|
|
|
|
+ def create_database(self):
|
|
|
|
+ """
|
|
|
|
+ 创建数据库连接
|
|
|
|
+ :param database: 数据库地址
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ engine = create_engine(self.database)
|
|
|
|
+ return engine
|
|
|
|
+
|
|
|
|
+ def exec_sql(self, sql, engine):
|
|
|
|
+ """
|
|
|
|
+ 从数据库获取数据
|
|
|
|
+ :param sql: sql语句
|
|
|
|
+ :param engine: 数据库对象
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ df = pd.read_sql_query(sql, engine)
|
|
|
|
+ return df
|
|
|
|
+
|
|
|
|
+ def split_time(self, data):
|
|
|
|
+ data['C_TIME'] = pd.to_datetime(data["C_TIME"])
|
|
|
|
+ data.set_index('C_TIME', inplace=True)
|
|
|
|
+ data = data.sort_index().loc[self.begin: self.end]
|
|
|
|
+ data.reset_index(drop=False, inplace=True)
|
|
|
|
+ return data
|
|
|
|
+
|
|
|
|
+ def get_process_NWP(self):
|
|
|
|
+ """
|
|
|
|
+ 从数据库中获取NWP数据,并进行简单处理
|
|
|
|
+ :param database:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ # NPW数据
|
|
|
|
+ engine = self.create_database()
|
|
|
|
+ sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \
|
|
|
|
+ "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \
|
|
|
|
+ "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \
|
|
|
|
+ "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \
|
|
|
|
+ " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 光的NWP字段
|
|
|
|
+ NWP = self.exec_sql(sql_NWP, engine)
|
|
|
|
+
|
|
|
|
+ NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime)
|
|
|
|
+
|
|
|
|
+ NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'})
|
|
|
|
+ NWP = cleaning(NWP, 'NWP')
|
|
|
|
+ # NWP = self.split_time(NWP)
|
|
|
|
+ NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
+ saveData("NWP.csv", NWP)
|
|
|
|
+ print("导出nwp数据")
|
|
|
|
+ return NWP
|
|
|
|
+
|
|
|
|
+ def get_process_weather(self):
|
|
|
|
+ """
|
|
|
|
+ 获取环境检测仪数据
|
|
|
|
+ :param database:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ engine = self.create_database()
|
|
|
|
+ print("现有环境监测仪:{}".format(weatherloc))
|
|
|
|
+ for i in weatherloc:
|
|
|
|
+ # 删除没用的列
|
|
|
|
+ drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"]
|
|
|
|
+
|
|
|
|
+ get_colmns = []
|
|
|
|
+ # 查询表的所有列名
|
|
|
|
+ result_set = self.exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine)
|
|
|
|
+ for name in result_set.iloc[:,0]:
|
|
|
|
+ if name not in drop_colmns:
|
|
|
|
+ get_colmns.append(name)
|
|
|
|
+
|
|
|
|
+ all_columns_str = ", ".join([f'{col}' for col in get_colmns])
|
|
|
|
+
|
|
|
|
+ weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+ str(i) + " and C_TIME between '{}' and '{}'".format(self.begin, self.end)
|
|
|
|
+ weather = self.exec_sql(weather_sql, engine)
|
|
|
|
+ weather['C_TIME'] = pd.to_datetime(weather['C_TIME'])
|
|
|
|
+ # weather = self.split_time(weather)
|
|
|
|
+ saveData("/weather-{}.csv".format(i), weather)
|
|
|
|
+ print("环境监测仪{}导出数据".format(i))
|
|
|
|
+
|
|
|
|
+ def get_process_power(self):
|
|
|
|
+ """
|
|
|
|
+ 获取整体功率数据
|
|
|
|
+ :param database:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ engine = self.create_database()
|
|
|
|
+ sql_cap = "select C_CAPACITY from t_electric_field"
|
|
|
|
+ cap = self.exec_sql(sql_cap, engine)['C_CAPACITY']
|
|
|
|
+ sql_power = "select C_TIME, C_REAL_VALUE, C_ABLE_VALUE, C_IS_RATIONING_BY_MANUAL_CONTROL, C_IS_RATIONING_BY_AUTO_CONTROL" \
|
|
|
|
+ " from t_power_station_status_data where C_TIME between '{}' and '{}'".format(self.begin, self.end)
|
|
|
|
+ # if self.opt.usable_power["clean_power_by_signal"]:
|
|
|
|
+ # sql_power += " and C_IS_RATIONING_BY_MANUAL_CONTROL=0 and C_IS_RATIONING_BY_AUTO_CONTROL=0"
|
|
|
|
+ powers = self.exec_sql(sql_power, engine)
|
|
|
|
+ mask1 = powers.loc[:, 'C_REAL_VALUE'].astype(float) > float(cap)
|
|
|
|
+ mask = powers['C_REAL_VALUE'] == -99
|
|
|
|
+
|
|
|
|
+ mask = mask | mask1
|
|
|
|
+ print("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum()))
|
|
|
|
+ powers = powers[~mask]
|
|
|
|
+ print("剔除完后还剩{}条".format(len(powers)))
|
|
|
|
+ powers.reset_index(drop=True, inplace=True)
|
|
|
|
+ binary_map = {b'\x00': 0, b'\x01': 1}
|
|
|
|
+ powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map)
|
|
|
|
+ powers = rm_duplicated(powers)
|
|
|
|
+ saveData("power.csv", powers)
|
|
|
|
+
|
|
|
|
+ def get_process_dq(self):
|
|
|
|
+ """
|
|
|
|
+ 获取短期预测结果
|
|
|
|
+ :param database:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ engine = self.create_database()
|
|
|
|
+ sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \
|
|
|
|
+ "where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
|
|
|
|
+ dq = self.exec_sql(sql_dq, engine)
|
|
|
|
+ # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms')
|
|
|
|
+ dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime)
|
|
|
|
+ # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1]
|
|
|
|
+ # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True)
|
|
|
|
+ dq = cleaning(dq, 'dq', cols=['C_FP_VALUE'])
|
|
|
|
+ dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
+ saveData("dq.csv", dq)
|
|
|
|
+ print("导出dq数据")
|
|
|
|
+
|
|
|
|
+ def get_process_cdq(self):
|
|
|
|
+ """
|
|
|
|
+ 获取超短期预测结果
|
|
|
|
+ :param database:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ engine = self.create_database()
|
|
|
|
+ sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from " \
|
|
|
|
+ "t_forecast_power_ultra_short_term_his" \
|
|
|
|
+ " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp)
|
|
|
|
+ cdq = self.exec_sql(sql_cdq, engine)
|
|
|
|
+ cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime)
|
|
|
|
+ cdq = cleaning(cdq, 'cdq', cols=['C_ABLE_VALUE'], dup=False)
|
|
|
|
+ # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])]
|
|
|
|
+ cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S')
|
|
|
|
+ saveData("cdq.csv", cdq)
|
|
|
|
+
|
|
|
|
+ def indep_process(self):
|
|
|
|
+ """
|
|
|
|
+ 进一步数据处理:时间统一处理等
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ # 环境监测仪数据处理
|
|
|
|
+ for i in weatherloc:
|
|
|
|
+ weather = readData("/weather-{}.csv".format(i))
|
|
|
|
+ weather = cleaning(weather, 'weather', cols=['C_GLOBALR', 'C_DIRECTR', 'C_DIFFUSER', 'C_RH', 'C_AIRT', 'C_P', 'C_WS', 'C_WD'])
|
|
|
|
+ saveData("/weather-{}-process.csv".format(i), weather)
|
|
|
|
+
|
|
|
|
+ def data_process(self):
|
|
|
|
+ """
|
|
|
|
+ 数据导出+初步处理的总操控代码
|
|
|
|
+ :param database:
|
|
|
|
+ :return:
|
|
|
|
+ """
|
|
|
|
+ self.clear_data()
|
|
|
|
+ try:
|
|
|
|
+ self.get_process_power()
|
|
|
|
+ self.get_process_dq()
|
|
|
|
+ # self.get_process_cdq()
|
|
|
|
+ self.get_process_NWP()
|
|
|
|
+ self.get_process_weather()
|
|
|
|
+ self.indep_process()
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print("导出数据出错:{}".format(e.args))
|
|
|
|
|
|
- # 使用完整的时间序列索引创建一个空的 DataFrame
|
|
|
|
- full_df = pd.DataFrame(index=full_time_range)
|
|
|
|
- full_df.index.name = 'C_TIME'
|
|
|
|
|
|
|
|
- # 将原始数据与空的 DataFrame 合并
|
|
|
|
- merged_df = full_df.merge(df, how='left', left_on='time', right_on='time')
|
|
|
|
-
|
|
|
|
- # 使用 -99 填充缺失值,除了时间列
|
|
|
|
- merged_df.fillna(-99, inplace=True)
|
|
|
|
- merged_df.reset_index(inplace=True)
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-def data_process(database):
|
|
|
|
- """
|
|
|
|
- 数据导出+初步处理的总操控代码
|
|
|
|
- :param database:
|
|
|
|
- :return:
|
|
|
|
- """
|
|
|
|
- clear_data()
|
|
|
|
- get_process_power(database)
|
|
|
|
- get_process_dq(database)
|
|
|
|
- get_process_cdq(database)
|
|
|
|
- get_process_NWP(database)
|
|
|
|
- get_process_weather(database)
|
|
|
|
-
|
|
|
|
- indep_process()
|
|
|
|
- NWP_indep_process()
|
|
|
|
- norm.save_yml({'mean': norm.mean, 'std': norm.std}, arg.normloc)
|
|
|
|
-
|
|
|
|
-if __name__ == '__main__':
|
|
|
|
-
|
|
|
|
- import os
|
|
|
|
- import glob
|
|
|
|
- # 设置文件夹路径
|
|
|
|
- folder_path = '../data'
|
|
|
|
-
|
|
|
|
- # 使用 glob 获取所有的 .csv 文件
|
|
|
|
- csv_files = glob.glob(os.path.join(folder_path, '*.csv'))
|
|
|
|
-
|
|
|
|
- # 遍历所有 .csv 文件并删除
|
|
|
|
- for file_path in csv_files:
|
|
|
|
- os.remove(file_path)
|
|
|
|
- # database = "mysql+pymysql://root:!QAZ2root@192.168.1.205:3306/ipfcst-sishui-a"
|
|
|
|
- # engine = create_database(database)
|
|
|
|
- #
|
|
|
|
- # # NPW数据
|
|
|
|
- # sql_NWP = "select C_SC_DATE,C_SC_TIME,C_T,C_RH,C_PRESSURE,C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170,C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp"
|
|
|
|
- # NWP = exec_sql(sql_NWP,engine)
|
|
|
|
- #
|
|
|
|
- # # 分风机功率
|
|
|
|
- # sql_wind = "select C_WS,C_ACTIVE_POWER from t_wind_turbine_status_data_15 WHERE C_EQUIPMENT_NO=2 and C_WS>0 and C_ACTIVE_POWER>0 "
|
|
|
|
- # df_wind = exec_sql(sql_wind,engine)
|
|
|
|
- # print(df_wind)
|
|
|
|
-
|
|
|
|
- # 总功率数据读取
|
|
|
|
- # sql_power = "select * from t_power_station_status_data"
|
|
|
|
- # df_power = exec_sql(sql_power, engine)
|
|
|
|
- filenames = []
|
|
|
|
- dataframes = []
|
|
|
|
- for i in arg.turbineloc:
|
|
|
|
- filenames.append("../data/turbine-15/turbine-{}.csv".format(i))
|
|
|
|
- for name in filenames:
|
|
|
|
- dataframes.append(pd.read_csv(name).iloc[:7000,:])
|
|
|
|
-
|
|
|
|
- # for df in enumerate(dataframes):
|
|
|
|
- # df =
|
|
|
|
- mean_of_first_columns = pd.concat([df['C_WS'] for df in dataframes], axis=1).mean(axis=1)
|
|
|
|
- mean_of_second_columns = (pd.concat([df['C_ACTIVE_POWER'] for df in dataframes], axis=1).sum(axis=1)/1000).astype(int)
|
|
|
|
- print(len(mean_of_first_columns))
|
|
|
|
-
|
|
|
|
-
|
|
|
|
- plt.scatter(mean_of_first_columns, mean_of_second_columns)
|
|
|
|
- plt.show()
|
|
|
|
|
|
|
|
|
|
|