import pandas as pd import datetime, time import yaml import pymysql import os from sqlalchemy import create_engine import pytz from cache.data_cleaning import cleaning, rm_duplicated current_path = os.path.dirname(__file__) dataloc = current_path + '/data/' def readData(name): """ 读取数据 :param name: 名字 :return: """ path = dataloc + r"/" + name return pd.read_csv(path) def saveData(name, data): """ 存放数据 :param name: 名字 :param data: 数据 :return: """ path = dataloc + r"/" + name os.makedirs(os.path.dirname(path), exist_ok=True) data.to_csv(path, index=False) def timestamp_to_datetime(ts): local_timezone = pytz.timezone('Asia/Shanghai') if type(ts) is not int: raise ValueError("timestamp-时间格式必须是整型") if len(str(ts)) == 13: dt = datetime.datetime.fromtimestamp(ts/1000, tz=pytz.utc).astimezone(local_timezone) return dt elif len(str(ts)) == 10: dt = datetime.datetime.fromtimestamp(ts, tz=pytz.utc).astimezone(local_timezone) return dt else: raise ValueError("timestamp-时间格式错误") def dt_tag(dt): date = dt.replace(hour=0, minute=0, second=0) delta = (dt - date) / pd.Timedelta(minutes=15) return delta + 1 def timestr_to_timestamp(time_str): """ 将时间戳或时间字符串转换为datetime.datetime类型 :param time_data: int or str :return:datetime.datetime """ if isinstance(time_str, str): if len(time_str) == 10: dt = datetime.datetime.strptime(time_str, '%Y-%m-%d') return int(round(time.mktime(dt.timetuple())) * 1000) elif len(time_str) in {17, 18, 19}: dt = datetime.datetime.strptime(time_str, '%Y-%m-%d %H:%M:%S') # strptime字符串解析必须严格按照字符串中的格式 return int(round(time.mktime(dt.timetuple())) * 1000) # 转换成毫秒级的时间戳 else: raise ValueError("时间字符串长度不满足要求!") else: return time_str class DataBase(object): def __init__(self, begin, end, opt, logger): self.opt = opt self.begin = begin self.his_begin = self.begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4) self.end = end + pd.Timedelta(days=1) - pd.Timedelta(minutes=15) self.begin_stamp = timestr_to_timestamp(str(begin)) self.his_begin_stamp = timestr_to_timestamp(str(self.his_begin)) self.end_stamp = timestr_to_timestamp(str(self.end)) self.database = opt.database self.logger = logger def clear_data(self): """ 删除所有csv :return: """ # 设置文件夹路径 import glob import os folder_path = dataloc # 使用 glob 获取所有的 .csv 文件路径 csv_files = glob.glob(os.path.join(folder_path, '**/*.csv'), recursive=True) # 遍历所有 .csv 文件并删除 for file_path in csv_files: os.remove(file_path) self.logger.info("清除所有csv文件") def create_database(self): """ 创建数据库连接 :param database: 数据库地址 :return: """ engine = create_engine(self.database) return engine def exec_sql(self, sql, engine): """ 从数据库获取数据 :param sql: sql语句 :param engine: 数据库对象 :return: """ df = pd.read_sql_query(sql, engine) return df def split_time(self, data): data['C_TIME'] = pd.to_datetime(data["C_TIME"]) data.set_index('C_TIME', inplace=True) data = data.sort_index().loc[self.begin: self.end] data.reset_index(drop=False, inplace=True) return data def get_process_NWP(self): """ 从数据库中获取NWP数据,并进行简单处理 :param database: :return: """ # NPW数据 engine = self.create_database() if self.opt.new_field: sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR, C_TPR," \ "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, C_SOLAR_ZENITH," \ "C_LCC, C_MCC, C_HCC, C_TCC, C_CLEARSKY_GHI, C_DNI_CALCD," \ "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \ "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \ " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 新NWP字段 else: sql_NWP = "select C_PRE_TIME,C_T,C_RH,C_PRESSURE, C_SWR," \ "C_DIFFUSE_RADIATION, C_DIRECT_RADIATION, " \ "C_WD10,C_WD30,C_WD50,C_WD70,C_WD80,C_WD90,C_WD100,C_WD170," \ "C_WS10,C_WS30,C_WS50,C_WS70,C_WS80,C_WS90,C_WS100,C_WS170 from t_nwp " \ " where C_PRE_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) # 老NWP字段 NWP = self.exec_sql(sql_NWP, engine) NWP['C_PRE_TIME'] = NWP['C_PRE_TIME'].apply(timestamp_to_datetime) NWP = NWP.rename(columns={'C_PRE_TIME': 'C_TIME'}) NWP['DT_TAG'] = NWP.apply(lambda x: dt_tag(x['C_TIME']), axis=1) NWP = cleaning(NWP, 'NWP', self.logger) # NWP = self.split_time(NWP) NWP['C_TIME'] = NWP['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') saveData("NWP.csv", NWP) self.logger.info("导出nwp数据") return NWP def get_process_weather(self): """ 获取环境检测仪数据 :param database: :return: """ engine = self.create_database() self.logger.info("现有环境监测仪:{}".format(self.opt.weatherloc)) for i in self.opt.weatherloc: # 删除没用的列 drop_colmns = ["C_ID", "C_EQUIPMENT_NO", "C_DATA1","C_DATA2","C_DATA3","C_DATA4","C_DATA5","C_DATA6","C_DATA7","C_DATA8","C_DATA9","C_DATA10", "C_STATUS", "C_IS_GENERATED","C_ABNORMAL_CODE"] get_colmns = [] # 查询表的所有列名 result_set = self.exec_sql("SHOW COLUMNS FROM t_weather_station_status_data", engine) for name in result_set.iloc[:,0]: if name not in drop_colmns: get_colmns.append(name) all_columns_str = ", ".join([f'{col}' for col in get_colmns]) weather_sql = "select " + all_columns_str + " from t_weather_station_status_data where C_EQUIPMENT_NO="+ str(i) + " and C_TIME between '{}' and '{}'".format(self.his_begin, self.end) weather = self.exec_sql(weather_sql, engine) weather['C_TIME'] = pd.to_datetime(weather['C_TIME']) # weather = self.split_time(weather) saveData("/weather-{}.csv".format(i), weather) self.logger.info("环境监测仪{}导出数据".format(i)) def get_process_power(self): """ 获取整体功率数据 :param database: :return: """ engine = self.create_database() sql_cap = "select C_CAPACITY from t_electric_field" cap = self.exec_sql(sql_cap, engine)['C_CAPACITY'] self.opt.cap = float(cap) sql_power = "select C_TIME,C_REAL_VALUE, C_ABLE_VALUE, C_REFERENCE_POWER_BY_SAMPLE, C_IS_RATIONING_BY_MANUAL_CONTROL," \ " C_IS_RATIONING_BY_AUTO_CONTROL from t_power_station_status_data" \ " where C_TIME between '{}' and '{}'".format(self.his_begin, self.end) powers = self.exec_sql(sql_power, engine) mask2 = powers[self.opt.predict] < 0 mask1 = powers.loc[:, 'C_REAL_VALUE'].astype(float) > float(cap) mask = powers['C_REAL_VALUE'] == -99 mask = mask | mask1 | mask2 self.logger.info("实际功率共{}条,要剔除功率有{}条".format(len(powers), mask.sum())) powers = powers[~mask] self.logger.info("剔除完后还剩{}条".format(len(powers))) powers.reset_index(drop=True, inplace=True) binary_map = {b'\x00': 0, b'\x01': 1} powers['C_IS_RATIONING_BY_AUTO_CONTROL'] = powers['C_IS_RATIONING_BY_AUTO_CONTROL'].map(binary_map) powers = rm_duplicated(powers, self.logger) saveData("power.csv", powers) def get_process_dq(self): """ 获取短期预测结果 :param database: :return: """ engine = self.create_database() sql_dq = "select C_FORECAST_TIME AS C_TIME, C_FP_VALUE from t_forecast_power_short_term " \ "where C_FORECAST_TIME between {} and {}".format(self.his_begin_stamp, self.end_stamp) dq = self.exec_sql(sql_dq, engine) # dq['C_TIME'] = pd.to_datetime(dq['C_TIME'], unit='ms') dq['C_TIME'] = dq['C_TIME'].apply(timestamp_to_datetime) # dq = dq[dq['C_FORECAST_HOW_LONG_AGO'] == 1] # dq.drop('C_FORECAST_HOW_LONG_AGO', axis=1, inplace=True) dq = cleaning(dq, 'dq', self.logger, cols=['C_FP_VALUE']) dq['C_TIME'] = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') saveData("dq.csv", dq) self.logger.info("导出dq数据") def get_process_cdq(self): """ 获取超短期预测结果 :param database: :return: """ engine = self.create_database() sql_cdq = "select C_FORECAST_TIME AS C_TIME, C_ABLE_VALUE, C_FORECAST_HOW_LONG_AGO from " \ "t_forecast_power_ultra_short_term_his" \ " where C_FORECAST_TIME between {} and {}".format(self.begin_stamp, self.end_stamp) cdq = self.exec_sql(sql_cdq, engine) cdq['C_TIME'] = cdq['C_TIME'].apply(timestamp_to_datetime) cdq = cleaning(cdq, 'cdq', self.logger, cols=['C_ABLE_VALUE'], dup=False) # cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(str(self.opt.predict_point)[1:])] cdq['C_TIME'] = cdq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S') saveData("cdq.csv", cdq) def indep_process(self): """ 进一步数据处理:时间统一处理等 :return: """ # 环境监测仪数据处理 for i in self.opt.weatherloc: weather = readData("/weather-{}.csv".format(i)) env_columns = [ele for ele in self.opt.env_columns if ele not in ['C_TIME', 'C_FP_VALUE', 'C_REAL_VALUE', 'error']] weather = cleaning(weather, 'weather', self.logger, cols=env_columns) weather = weather[weather[self.opt.usable_power["env"]] >= 0] weather['C_TIME'] = pd.to_datetime(weather['C_TIME']) weather_ave = weather.resample('15T', on='C_TIME').mean().reset_index() weather_ave = weather_ave.dropna(subset=[self.opt.usable_power['env']]) weather_ave.set_index('C_TIME', inplace=True) weather_ave = weather_ave.interpolate(method='linear') weather_ave = weather_ave.fillna(method='ffill') weather_ave = weather_ave.fillna(method='bfill') weather_ave.reset_index(drop=False, inplace=True) weather_ave.iloc[:, 1:] = weather_ave.iloc[:, 1:].round(2) saveData("/weather-{}-process.csv".format(i), weather_ave) def data_process(self): """ 数据导出+初步处理的总操控代码 :param database: :return: """ self.clear_data() try: self.get_process_power() self.get_process_dq() self.get_process_cdq() self.get_process_NWP() self.get_process_weather() self.indep_process() except Exception as e: self.logger.critical("导出数据出错:{}".format(e.args))