123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/11/23 14:07
- # file: clocking.py
- # author: David
- # company: shenyang JY
- import sys
- from xml.dom import ValidationErr
- import pandas as pd
- import threading
- import datetime
- from cache.limited_power_curve import LimitPower
- from apscheduler.schedulers.background import BackgroundScheduler
- import time
- import os
- import pickle
- import numpy as np
- from validate import Validation, ValidationError
- np.random.seed(42)
- from cache.calculate import calculate_acc
- from cache.formula import Formulas
- from cache.inputData import DataBase
- from cache.monitor import Monitor
- current_path = os.path.dirname(__file__)
- from pytz import timezone
- class Clock(object):
- def __init__(self, logger, args, va, process, features, fmi, fix):
- self.args = args
- self.va = va
- self.process = process
- self.features = features
- self.logger = logger
- self.opt = self.args.parse_args_and_yaml()
- self.mo = Monitor(logger, args)
- self.fmi = fmi
- self.fix = fix
- self.target = self.opt.predict
- self.logger.info("---以 {} 进行修模---".format(self.target))
- self.lp = LimitPower(self.logger, self.args, None)
- def update_thread(self):
- thread = threading.Thread(target=self.start_jobs)
- thread.start()
- def start_jobs(self):
- scheduler = BackgroundScheduler()
- scheduler.configure({'timezone': timezone("Asia/Shanghai")})
- scheduler.add_job(func=self.calculate_coe, trigger="cron", hour=23, minute=0)
- scheduler.add_job(func=self.mo.update_config, trigger="interval", seconds=60)
- scheduler.start()
- def cal_acc(self, df, target, opt):
- df = df.copy()
- df.rename(columns={'C_REAL_VALUE': 'realValue'}, inplace=True)
- df['ableValue'] = df['realValue']
- df['forecastAbleValue'] = df[target]
- df = df.apply(pd.to_numeric, errors='ignore')
- df['C_TIME'] = pd.to_datetime(df['C_TIME'])
- acc = calculate_acc(df, opt=opt)
- return acc
- def date_diff(self, current_dt, repair_dt):
- difference = (current_dt - datetime.datetime.strptime(repair_dt, '%Y-%m-%d').date())
- return difference.days
- def calculate_coe(self, install=False):
- # try:
- start = time.time()
- self.logger.info("检测系统当前的时间为:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start))))
- self.opt = self.args.parse_args_and_yaml()
- # self.opt.authentication['date'] = time.strftime('%Y-%m-%d', time.localtime(start))
- day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
- coe_start = day_end - pd.Timedelta(days=self.opt.update_coe_days)
- day_start_db = coe_start - pd.Timedelta(days=self.opt.repair_days) if install is True else coe_start - pd.Timedelta(days=self.opt.update_add_train_days)
- if self.date_diff(day_start_db.date(), self.opt.authentication['full_cap']) < 0:
- day_start_db = datetime.datetime.strptime(self.opt.authentication['full_cap'], '%Y-%m-%d')
- self.logger.info("更新初始修模起始时间为全容量并网:{}".format(self.opt.authentication['full_cap']))
- # db = DataBase(begin=day_start_db, end=day_end, opt=self.opt, logger=self.logger)
- # db.data_process()
- # self.opt.cap = db.opt.cap
- self.args.save_args_yml(self.opt)
- formula = Formulas(self.opt)
- day = int(time.strftime('%d', time.localtime(start)))
- repair, repairs = int(self.opt.repair_model_cycle), []
- terval = repair
- while repair <= 30:
- repairs.append(repair)
- repair += terval
- if day in repairs or install is True:
- # ------------------- 修模 ------------------------
- self.repairing_model(day_start_db, coe_start)
- self.opt.authentication['repair'] = self.opt.authentication['date']
- self.args.save_args_yml(self.opt)
- process_name = "forecast_ust"
- os.system(f"pkill {process_name}")
- self.logger.info("------------进入测试集自动计算-------------")
- coe_start += pd.Timedelta(days=1)
- nwp, env, dq, rp, rp_his = self.material(coe_start, day_end)
- nwp = pd.merge(nwp, dq, on="C_TIME")
- if self.opt.full_field is False:
- nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']+list(self.opt.zone.keys())]
- mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
- std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
- _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys()), mean=mean, std=std)
- env = pd.merge(env, rp_his, on='C_TIME')
- env = env.loc[:, self.opt.env_columns]
- mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
- std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
- _, _, env_features = self.normalize(env, mean=mean, std=std)
- data_test, env = self.process.get_test_data(nwp_features, env_features)
- test_X, data_Y = self.features.get_test_data(data_test, env)
- result = self.fmi.predict(test_X, batch_size=8)
- # 2.历史数据
- for point in range(0, 16, 1):
- dfs_point = []
- for i, df in enumerate(data_Y):
- df[1]["dq_fix"] = result[1][i]
- dfs_point.append(df[1].iloc[point])
- pre_data = pd.concat(dfs_point, axis=1).T
- pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].apply(pd.to_numeric, errors='ignore')
- pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME')
- pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
- pre_data['dq_fix'] = pre_data['dq_fix'].round(2)
- pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap
- pre_data.loc[pre_data['dq_fix'] < 0, 'dq_fix'] = 0
- T = 'T' + str(point + 1)
- if self.opt.coe[T]['update'] is False:
- continue
- pre_data['history'] = self.fix.history_error_clock(pre_data, rp_his, 16 - point - 1)
- pre_data.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
- self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1]))
- self.update_coe(pre_data, formula, point, test=False)
- self.args.save_args_yml(self.opt)
- end = time.time()
- self.logger.info("定时任务:每天自动确认系数,用了 %s 秒 " % (end - start))
- # except Exception as e:
- # self.logger.critical("定时任务出错:{}".format(e.args))
- def update_coe(self, pre_data, formula, point, test=False):
- T = 'T' + str(point + 1)
- if test is False:
- best, best_coe_m, best_coe_n = 0, 0, 0
- dq_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['C_FP_VALUE'].values)
- dq_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['dq_fix'].values)
- his_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['history'].values)
- for i in range(5, 210):
- for j in range(5, 210):
- pre_data["new"] = round(i / 170 * pre_data['dq_fix'] + j / 170 * pre_data['history'], 3)
- acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['new'].values)
- if acc > best:
- best = acc
- best_coe_m = i / 170
- best_coe_n = j / 170
- self.logger.info(
- "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期的准确率:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
- str(self.opt.update_coe_days), dq_acc, T, best, dq_fix_acc, his_fix_acc))
- self.opt.coe[T]['m'] = round(best_coe_m, 3)
- self.opt.coe[T]['n'] = round(best_coe_n, 3)
- else:
- best, best_coe_m, best_coe_n = 0, 0, 0
- cdq = pd.read_csv(current_path + '/data/cdq.csv', parse_dates=['C_TIME'])
- cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'])
- cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(point + 1)]
- cdq = pd.merge(cdq, pre_data, on='C_TIME')
- cdq['时间'] = cdq['C_TIME'].dt.strftime("%Y-%m-%d")
- cc = {
- 'formulaType': 'DAY_SHORT_ACCURACY',
- 'cap': '95.5',
- 'province': 'E52',
- 'electricType': 'E1',
- 'stationCode': 'J00149'
- }
- import argparse
- config = argparse.Namespace(**cc)
- cdqs = cdq.groupby('时间')
- dq_accs, dq_fix_accs, cdq_accs, his_accs = [], [], [], []
- for dt, group in cdqs:
- dq_acc = self.cal_acc(group, 'C_FP_VALUE', config)
- dq_fix_acc = self.cal_acc(group, 'dq_fix', config)
- cdq_acc = self.cal_acc(group, 'C_ABLE_VALUE', config)
- his_acc = self.cal_acc(group, 'history', config)
- dq_accs.append(dq_acc)
- dq_fix_accs.append(dq_fix_acc)
- cdq_accs.append(cdq_acc)
- his_accs.append(his_acc)
- print(dt, "这一天, 短期准确率:", dq_acc, "超短期公式准确率:", cdq_acc, "神经网络准确率:", dq_fix_acc,
- "历史功率准确率:", his_acc)
- for i in range(5, 210):
- for j in range(5, 210):
- cdq["new"] = round(i / 170 * cdq['dq_fix'] + j / 170 * cdq['history'], 2)
- acc = formula.calculate_acc_south(cdq['C_REAL_VALUE'].values, cdq['new'].values)
- if acc > best:
- best = acc
- best_coe_m = i / 170
- best_coe_n = j / 170
- cdq['new'] = round(best_coe_m * cdq['dq_fix'] + best_coe_n * cdq['history'], 2)
- cdq.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
- self.logger.info( "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(str(self.opt.update_coe_days), np.mean(dq_accs), T, best, np.mean(cdq_accs), np.mean(dq_fix_accs), np.mean(his_accs)))
- self.opt.coe[T]['m'] = round(best_coe_m, 3)
- self.opt.coe[T]['n'] = round(best_coe_n, 3)
- def repairing_model(self, day_start_db, day_end):
- self.logger.info("------------进入FMI神经网络修模-------------")
- nwp, env, dq, rp, rp_his = self.material(day_start_db, day_end, is_repair=True)
- nwp = pd.merge(nwp, dq, on="C_TIME")
- if self.opt.full_field is False:
- nwp = nwp[self.opt.nwp_columns+['C_REAL_VALUE']+list(self.opt.zone.keys())]
- mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
- std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
- _, _, nwp_features = self.normalize(nwp, mean=mean, std=std)
- env = pd.merge(env, rp_his, on='C_TIME')
- env = env.loc[:, self.opt.env_columns]
- mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
- std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
- _, _, env_features = self.normalize(env, mean=mean, std=std)
- self.opt.nwp_columns = nwp_features.columns.to_list()
- self.opt.env_columns = env_features.columns.to_list()
- if 'C_REAL_VALUE' in self.opt.nwp_columns:
- self.opt.nwp_columns.pop(self.opt.nwp_columns.index('C_REAL_VALUE'))
- self.opt.Model["input_size_nwp"] = len(self.opt.nwp_columns) - 1 - len(self.opt.zone.keys())
- self.opt.Model["input_size_env"] = len(self.opt.env_columns) - 1
- self.update_property()
- data_train, env = self.process.get_train_data(nwp_features, env_features)
- train_X, valid_X, train_Y, valid_Y = self.features.get_train_data(data_train, env)
- # train_Y = [np.array([y[:, 0] for y in train_Y])]
- # valid_Y = [np.array([y[:, 0] for y in valid_Y])]
- self.fmi.training(self.opt, [train_X, train_Y, valid_X, valid_Y])
- def update_property(self):
- self.va.set_lp()
- self.process.opt = self.opt
- self.features.opt = self.opt
- self.fix.opt = self.opt
- def material(self, begin, end, is_repair=False):
- his_begin = (begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)).strftime('%Y-%m-%d %H:%M:%S')
- begin, end = begin.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')
- if is_repair is True:
- self.logger.info("修模的起始时间为:{}-{}".format(begin, end))
- else:
- self.logger.info("确认系数的起止时间为:{}-{}".format(begin, end))
- nwp = pd.read_csv(current_path + '/data/NWP.csv', parse_dates=['C_TIME'])
- env = pd.read_csv(current_path + '/data/tower-1-process.csv', parse_dates=['C_TIME'])
- rp = pd.read_csv(current_path + '/data/power_filter3.csv', parse_dates=['C_TIME'])
- dq = pd.read_csv(current_path + '/data/dq.csv', parse_dates=['C_TIME'])
- z_power = pd.read_csv(current_path + '/data/z-power.csv', parse_dates=['C_TIME'])
- rp['C_TIME'] = pd.to_datetime(rp['C_TIME'])
- rp.set_index('C_TIME', inplace=True)
- rp = rp.loc[his_begin: end].reset_index(inplace=False)
- if env is None:
- raise ValidationError("材料-环境数据为空")
- rp = pd.merge(rp, env.loc[:, ['C_TIME'] + [self.opt.usable_power["env"]]], on='C_TIME')
- rp_his = rp.copy()
- if self.target == 'C_ABLE_VALUE':
- rp.drop(['C_REAL_VALUE'], axis=1, inplace=True)
- rp.rename(columns={'C_ABLE_VALUE': 'C_REAL_VALUE'}, inplace=True)
- rp_his = rp.copy()
- else:
- plt_name = '-训练集-' + begin + '-' + end if is_repair is True else '-测试集-' + begin + '-' + end
- self.lp.weather_power = rp
- if self.opt.usable_power["clean_power_which"] == 0:
- rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
- rp = rp_signal
- elif self.opt.usable_power["clean_power_which"] == 1:
- rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
- rp = rp_wind
- elif self.opt.usable_power["clean_power_which"] == 2:
- rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
- rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
- time_intersection = set(rp_signal['C_TIME'])
- time_intersection.intersection_update(rp_wind['C_TIME'])
- rp = rp_wind[rp_wind['C_TIME'].isin(time_intersection)]
- elif self.opt.usable_power["clean_power_which"] == 3:
- rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
- rp = rp[rp['diff'] < 0.2*self.opt.cap]
- elif self.opt.usable_power["clean_power_which"] == 4:
- rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
- rp_sample = rp[rp['diff'] < 0.2 * self.opt.cap]
- rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
- time_intersection = set(rp_sample['C_TIME'])
- time_intersection.intersection_update(rp_signal['C_TIME'])
- rp = rp_sample[rp_sample['C_TIME'].isin(time_intersection)]
- else:
- self.logger.info("不进行限电清洗")
- rp = rp.loc[: ,['C_TIME', 'C_REAL_VALUE']]
- rp = pd.merge(rp, z_power, on='C_TIME')
- # rp['C_REAL_VALUE'] = rp['C_REAL_VALUE'].apply(pd.to_numeric)
- rp_his = pd.merge(rp_his.loc[:, ['C_TIME', 'C_ABLE_VALUE']], rp, on='C_TIME', how='left')
- rp_his['C_REAL_VALUE'] = rp_his['C_REAL_VALUE'].fillna(rp_his['C_ABLE_VALUE'])
- rp_his = pd.merge(rp_his, dq, on='C_TIME')
- rp_his = rp_his.loc[:, ['C_TIME', 'C_FP_VALUE', 'C_ABLE_VALUE', 'C_REAL_VALUE']]
- # dq = rp_his[rp_his['C_TIME'].isin(rp['C_TIME'])]
- dq = pd.merge(rp_his.loc[:, ['C_TIME', 'C_FP_VALUE']], rp.loc[:, ['C_TIME', self.target]+list(self.opt.zone.keys())], on='C_TIME')
- nwp.set_index('C_TIME', inplace=True)
- nwp = nwp.loc[begin: end].reset_index()
- env.set_index('C_TIME', inplace=True)
- env = env.loc[his_begin: end].reset_index()
- if is_repair:
- mean, std = {}, {}
- for df in [nwp, env, dq, rp, z_power]:
- m, s, _ = self.normalize(df)
- mean.update(m)
- std.update(s)
- self.opt.mean = {k: float(v) for k, v in mean.items()}
- self.opt.std = {k: float(v) for k, v in std.items()}
- return nwp, env, dq, rp, rp_his
- def saveVar(self, path, data):
- os.makedirs(os.path.dirname(path), exist_ok=True)
- with open(path, 'wb') as file:
- pickle.dump(data, file)
- def normalize(self, df, drop_list=['C_TIME'], mean=None, std=None):
- df1 = df.copy()
- drop_index = [list(df1).index(x) for x in drop_list]
- df1.drop(drop_list, axis=1, inplace=True, errors='ignore')
- df1 = df1.apply(pd.to_numeric, errors='ignore')
- if mean is None or std is None:
- mean = np.mean(df1, axis=0).round(3) # 数据的均值
- std = np.std(df1, axis=0).round(3) # 标准差
- new = []
- for i, row in df1.iterrows():
- d = (row - mean) / std # 归一化
- new.append(d.round(3))
- if len(new) > 0:
- new = pd.concat(new, axis=1).T
- for index, col_name in zip(drop_index, drop_list):
- new.insert(index, col_name, df[col_name].values)
- return mean, std, new
- if __name__ == '__main__':
- from config import myargparse
- from error import dqFix
- from process import preData
- from detect import Detection
- from logs import Log
- args = myargparse(discription="场站端配置", add_help=False)
- data = preData(args)
- detect = Detection(args)
- log = Log()
- fix = dqFix(args=args)
- clock = Clock(log, args, data, detect, fix)
- # clock.calculate_coe(cluster=True)
- clock.clustering()
|