#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/11/23 14:07 # file: clocking.py # author: David # company: shenyang JY import sys from xml.dom import ValidationErr import pandas as pd import threading import datetime from cache.limited_power_curve import LimitPower from apscheduler.schedulers.background import BackgroundScheduler import time import os import pickle import numpy as np from validate import Validation, ValidationError np.random.seed(42) from cache.calculate import calculate_acc from cache.formula import Formulas from cache.inputData import DataBase from cache.monitor import Monitor current_path = os.path.dirname(__file__) from pytz import timezone class Clock(object): def __init__(self, logger, args, va, process, features, fmi, fix): self.args = args self.va = va self.process = process self.features = features self.logger = logger self.opt = self.args.parse_args_and_yaml() self.mo = Monitor(logger, args) self.fmi = fmi self.fix = fix self.target = self.opt.predict self.logger.info("---以 {} 进行修模---".format(self.target)) self.lp = LimitPower(self.logger, self.args, None) def update_thread(self): thread = threading.Thread(target=self.start_jobs) thread.start() def start_jobs(self): scheduler = BackgroundScheduler() scheduler.configure({'timezone': timezone("Asia/Shanghai")}) scheduler.add_job(func=self.calculate_coe, trigger="cron", hour=23, minute=0) scheduler.add_job(func=self.mo.update_config, trigger="interval", seconds=60) scheduler.start() def cal_acc(self, df, target, opt): df = df.copy() df.rename(columns={'C_REAL_VALUE': 'realValue'}, inplace=True) df['ableValue'] = df['realValue'] df['forecastAbleValue'] = df[target] df = df.apply(pd.to_numeric, errors='ignore') df['C_TIME'] = pd.to_datetime(df['C_TIME']) acc = calculate_acc(df, opt=opt) return acc def date_diff(self, current_dt, repair_dt): difference = (current_dt - datetime.datetime.strptime(repair_dt, '%Y-%m-%d').date()) return difference.days def calculate_coe(self, install=False): # try: start = time.time() self.logger.info("检测系统当前的时间为:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start)))) self.opt = self.args.parse_args_and_yaml() # self.opt.authentication['date'] = time.strftime('%Y-%m-%d', time.localtime(start)) day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d') coe_start = day_end - pd.Timedelta(days=self.opt.update_coe_days) day_start_db = coe_start - pd.Timedelta(days=self.opt.repair_days) if install is True else coe_start - pd.Timedelta(days=self.opt.update_add_train_days) if self.date_diff(day_start_db.date(), self.opt.authentication['full_cap']) < 0: day_start_db = datetime.datetime.strptime(self.opt.authentication['full_cap'], '%Y-%m-%d') self.logger.info("更新初始修模起始时间为全容量并网:{}".format(self.opt.authentication['full_cap'])) # db = DataBase(begin=day_start_db, end=day_end, opt=self.opt, logger=self.logger) # db.data_process() # self.opt.cap = db.opt.cap self.args.save_args_yml(self.opt) formula = Formulas(self.opt) day = int(time.strftime('%d', time.localtime(start))) repair, repairs = int(self.opt.repair_model_cycle), [] terval = repair while repair <= 30: repairs.append(repair) repair += terval if day in repairs or install is True: # ------------------- 修模 ------------------------ self.repairing_model(day_start_db, coe_start) self.opt.authentication['repair'] = self.opt.authentication['date'] self.args.save_args_yml(self.opt) self.logger.info("------------进入测试集自动计算-------------") coe_start += pd.Timedelta(days=1) nwp, env, dq, rp, rp_his = self.material(coe_start, day_end) nwp = pd.merge(nwp, dq, on="C_TIME") if self.opt.full_field is False: nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']+list(self.opt.zone.keys())] mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())] std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())] _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys()), mean=mean, std=std) env = pd.merge(env, rp_his, on='C_TIME') env = env.loc[:, self.opt.env_columns] mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']] std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']] _, _, env_features = self.normalize(env, mean=mean, std=std) data_test, env = self.process.get_test_data(nwp_features, env_features) test_X, data_Y = self.features.get_test_data(data_test, env) result = self.fmi.predict(test_X, batch_size=8) # 2.历史数据 for point in range(0, 16, 1): dfs_point = [] for i, df in enumerate(data_Y): df[1]["dq_fix"] = result[1][i] dfs_point.append(df[1].iloc[point]) pre_data = pd.concat(dfs_point, axis=1).T pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].apply(pd.to_numeric, errors='ignore') pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME') pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] pre_data['dq_fix'] = pre_data['dq_fix'].round(2) pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap pre_data.loc[pre_data['dq_fix'] < 0, 'dq_fix'] = 0 T = 'T' + str(point + 1) if self.opt.coe[T]['update'] is False: continue pre_data['history'] = self.fix.history_error_clock(pre_data, rp_his, 16 - point - 1) pre_data.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False) self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1])) self.update_coe(pre_data, formula, point, test=False) self.args.save_args_yml(self.opt) end = time.time() self.logger.info("定时任务:每天自动确认系数,用了 %s 秒 " % (end - start)) # except Exception as e: # self.logger.critical("定时任务出错:{}".format(e.args)) def update_coe(self, pre_data, formula, point, test=False): T = 'T' + str(point + 1) if test is False: best, best_coe_m, best_coe_n = 0, 0, 0 dq_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['C_FP_VALUE'].values) dq_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['dq_fix'].values) his_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['history'].values) for i in range(5, 210): for j in range(5, 210): pre_data["new"] = round(i / 170 * pre_data['dq_fix'] + j / 170 * pre_data['history'], 3) acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['new'].values) if acc > best: best = acc best_coe_m = i / 170 best_coe_n = j / 170 self.logger.info( "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期的准确率:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format( str(self.opt.update_coe_days), dq_acc, T, best, dq_fix_acc, his_fix_acc)) self.opt.coe[T]['m'] = round(best_coe_m, 3) self.opt.coe[T]['n'] = round(best_coe_n, 3) else: best, best_coe_m, best_coe_n = 0, 0, 0 cdq = pd.read_csv(current_path + '/data/cdq.csv', parse_dates=['C_TIME']) cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME']) cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(point + 1)] cdq = pd.merge(cdq, pre_data, on='C_TIME') cdq['时间'] = cdq['C_TIME'].dt.strftime("%Y-%m-%d") cc = { 'formulaType': 'DAY_SHORT_ACCURACY', 'cap': '95.5', 'province': 'E52', 'electricType': 'E1', 'stationCode': 'J00149' } import argparse config = argparse.Namespace(**cc) cdqs = cdq.groupby('时间') dq_accs, dq_fix_accs, cdq_accs, his_accs = [], [], [], [] for dt, group in cdqs: dq_acc = self.cal_acc(group, 'C_FP_VALUE', config) dq_fix_acc = self.cal_acc(group, 'dq_fix', config) cdq_acc = self.cal_acc(group, 'C_ABLE_VALUE', config) his_acc = self.cal_acc(group, 'history', config) dq_accs.append(dq_acc) dq_fix_accs.append(dq_fix_acc) cdq_accs.append(cdq_acc) his_accs.append(his_acc) print(dt, "这一天, 短期准确率:", dq_acc, "超短期公式准确率:", cdq_acc, "神经网络准确率:", dq_fix_acc, "历史功率准确率:", his_acc) for i in range(5, 210): for j in range(5, 210): cdq["new"] = round(i / 170 * cdq['dq_fix'] + j / 170 * cdq['history'], 2) acc = formula.calculate_acc_south(cdq['C_REAL_VALUE'].values, cdq['new'].values) if acc > best: best = acc best_coe_m = i / 170 best_coe_n = j / 170 cdq['new'] = round(best_coe_m * cdq['dq_fix'] + best_coe_n * cdq['history'], 2) cdq.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False) self.logger.info( "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(str(self.opt.update_coe_days), np.mean(dq_accs), T, best, np.mean(cdq_accs), np.mean(dq_fix_accs), np.mean(his_accs))) self.opt.coe[T]['m'] = round(best_coe_m, 3) self.opt.coe[T]['n'] = round(best_coe_n, 3) def repairing_model(self, day_start_db, day_end): self.logger.info("------------进入FMI神经网络修模-------------") nwp, env, dq, rp, rp_his = self.material(day_start_db, day_end, is_repair=True) nwp = pd.merge(nwp, dq, on="C_TIME") if self.opt.full_field is False: nwp = nwp[self.opt.nwp_columns+['C_REAL_VALUE']+list(self.opt.zone.keys())] mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']] std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']] _, _, nwp_features = self.normalize(nwp, mean=mean, std=std) env = pd.merge(env, rp_his, on='C_TIME') env = env.loc[:, self.opt.env_columns] mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']] std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']] _, _, env_features = self.normalize(env, mean=mean, std=std) self.opt.nwp_columns = nwp_features.columns.to_list() self.opt.env_columns = env_features.columns.to_list() if 'C_REAL_VALUE' in self.opt.nwp_columns: self.opt.nwp_columns.pop(self.opt.nwp_columns.index('C_REAL_VALUE')) self.opt.Model["input_size_nwp"] = len(self.opt.nwp_columns) - 1 - len(self.opt.zone.keys()) self.opt.Model["input_size_env"] = len(self.opt.env_columns) - 1 self.update_property() data_train, env = self.process.get_train_data(nwp_features, env_features) train_X, valid_X, train_Y, valid_Y = self.features.get_train_data(data_train, env) # train_Y = [np.array([y[:, 0] for y in train_Y])] # valid_Y = [np.array([y[:, 0] for y in valid_Y])] self.fmi.training(self.opt, [train_X, train_Y, valid_X, valid_Y]) def update_property(self): self.va.set_lp() self.process.opt = self.opt self.features.opt = self.opt self.fix.opt = self.opt def material(self, begin, end, is_repair=False): his_begin = (begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)).strftime('%Y-%m-%d %H:%M:%S') begin, end = begin.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d') if is_repair is True: self.logger.info("修模的起始时间为:{}-{}".format(begin, end)) else: self.logger.info("确认系数的起止时间为:{}-{}".format(begin, end)) nwp = pd.read_csv(current_path + '/data/NWP.csv', parse_dates=['C_TIME']) env = pd.read_csv(current_path + '/data/tower-1-process.csv', parse_dates=['C_TIME']) rp = pd.read_csv(current_path + '/data/power_filter3.csv', parse_dates=['C_TIME']) dq = pd.read_csv(current_path + '/data/dq.csv', parse_dates=['C_TIME']) z_power = pd.read_csv(current_path + '/data/z-power.csv', parse_dates=['C_TIME']) rp['C_TIME'] = pd.to_datetime(rp['C_TIME']) rp.set_index('C_TIME', inplace=True) rp = rp.loc[his_begin: end].reset_index(inplace=False) if env is None: raise ValidationError("材料-环境数据为空") rp = pd.merge(rp, env.loc[:, ['C_TIME'] + [self.opt.usable_power["env"]]], on='C_TIME') rp_his = rp.copy() if self.target == 'C_ABLE_VALUE': rp.drop(['C_REAL_VALUE'], axis=1, inplace=True) rp.rename(columns={'C_ABLE_VALUE': 'C_REAL_VALUE'}, inplace=True) rp_his = rp.copy() else: plt_name = '-训练集-' + begin + '-' + end if is_repair is True else '-测试集-' + begin + '-' + end self.lp.weather_power = rp if self.opt.usable_power["clean_power_which"] == 0: rp_signal = self.lp.clean_limited_power_by_signal(plt_name) rp = rp_signal elif self.opt.usable_power["clean_power_which"] == 1: rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair) rp = rp_wind elif self.opt.usable_power["clean_power_which"] == 2: rp_signal = self.lp.clean_limited_power_by_signal(plt_name) rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair) time_intersection = set(rp_signal['C_TIME']) time_intersection.intersection_update(rp_wind['C_TIME']) rp = rp_wind[rp_wind['C_TIME'].isin(time_intersection)] elif self.opt.usable_power["clean_power_which"] == 3: rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE'] rp = rp[rp['diff'] < 0.2*self.opt.cap] elif self.opt.usable_power["clean_power_which"] == 4: rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE'] rp_sample = rp[rp['diff'] < 0.2 * self.opt.cap] rp_signal = self.lp.clean_limited_power_by_signal(plt_name) time_intersection = set(rp_sample['C_TIME']) time_intersection.intersection_update(rp_signal['C_TIME']) rp = rp_sample[rp_sample['C_TIME'].isin(time_intersection)] else: self.logger.info("不进行限电清洗") rp = rp.loc[: ,['C_TIME', 'C_REAL_VALUE']] rp = pd.merge(rp, z_power, on='C_TIME') # rp['C_REAL_VALUE'] = rp['C_REAL_VALUE'].apply(pd.to_numeric) rp_his = pd.merge(rp_his.loc[:, ['C_TIME', 'C_ABLE_VALUE']], rp, on='C_TIME', how='left') rp_his['C_REAL_VALUE'] = rp_his['C_REAL_VALUE'].fillna(rp_his['C_ABLE_VALUE']) rp_his = pd.merge(rp_his, dq, on='C_TIME') rp_his = rp_his.loc[:, ['C_TIME', 'C_FP_VALUE', 'C_ABLE_VALUE', 'C_REAL_VALUE']] # dq = rp_his[rp_his['C_TIME'].isin(rp['C_TIME'])] dq = pd.merge(rp_his.loc[:, ['C_TIME', 'C_FP_VALUE']], rp.loc[:, ['C_TIME', self.target]+list(self.opt.zone.keys())], on='C_TIME') nwp.set_index('C_TIME', inplace=True) nwp = nwp.loc[begin: end].reset_index() env.set_index('C_TIME', inplace=True) env = env.loc[his_begin: end].reset_index() if is_repair: mean, std = {}, {} for df in [nwp, env, dq, rp, z_power]: m, s, _ = self.normalize(df) mean.update(m) std.update(s) self.opt.mean = {k: float(v) for k, v in mean.items()} self.opt.std = {k: float(v) for k, v in std.items()} return nwp, env, dq, rp, rp_his def saveVar(self, path, data): os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'wb') as file: pickle.dump(data, file) def normalize(self, df, drop_list=['C_TIME'], mean=None, std=None): df1 = df.copy() drop_index = [list(df1).index(x) for x in drop_list] df1.drop(drop_list, axis=1, inplace=True, errors='ignore') df1 = df1.apply(pd.to_numeric, errors='ignore') if mean is None or std is None: mean = np.mean(df1, axis=0).round(3) # 数据的均值 std = np.std(df1, axis=0).round(3) # 标准差 new = [] for i, row in df1.iterrows(): d = (row - mean) / std # 归一化 new.append(d.round(3)) if len(new) > 0: new = pd.concat(new, axis=1).T for index, col_name in zip(drop_index, drop_list): new.insert(index, col_name, df[col_name].values) return mean, std, new if __name__ == '__main__': from config import myargparse from error import dqFix from process import preData from detect import Detection from logs import Log args = myargparse(discription="场站端配置", add_help=False) data = preData(args) detect = Detection(args) log = Log() fix = dqFix(args=args) clock = Clock(log, args, data, detect, fix) # clock.calculate_coe(cluster=True) clock.clustering()