clocking.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/11/23 14:07
  4. # file: clocking.py
  5. # author: David
  6. # company: shenyang JY
  7. import sys
  8. from xml.dom import ValidationErr
  9. import pandas as pd
  10. import threading
  11. import datetime
  12. from cache.limited_power_curve import LimitPower
  13. from apscheduler.schedulers.background import BackgroundScheduler
  14. import time
  15. import os
  16. import pickle
  17. import numpy as np
  18. from validate import Validation, ValidationError
  19. np.random.seed(42)
  20. from cache.calculate import calculate_acc
  21. from cache.formula import Formulas
  22. from cache.inputData import DataBase
  23. from cache.monitor import Monitor
  24. current_path = os.path.dirname(__file__)
  25. from pytz import timezone
  26. class Clock(object):
  27. def __init__(self, logger, args, va, process, features, fmi, fix):
  28. self.args = args
  29. self.va = va
  30. self.process = process
  31. self.features = features
  32. self.logger = logger
  33. self.opt = self.args.parse_args_and_yaml()
  34. self.mo = Monitor(logger, args)
  35. self.fmi = fmi
  36. self.fix = fix
  37. self.target = self.opt.predict
  38. self.logger.info("---以 {} 进行修模---".format(self.target))
  39. self.lp = LimitPower(self.logger, self.args, None)
  40. def update_thread(self):
  41. thread = threading.Thread(target=self.start_jobs)
  42. thread.start()
  43. def start_jobs(self):
  44. scheduler = BackgroundScheduler()
  45. scheduler.configure({'timezone': timezone("Asia/Shanghai")})
  46. scheduler.add_job(func=self.calculate_coe, trigger="cron", hour=23, minute=0)
  47. scheduler.add_job(func=self.mo.update_config, trigger="interval", seconds=60)
  48. scheduler.start()
  49. def cal_acc(self, df, target, opt):
  50. df = df.copy()
  51. df.rename(columns={'C_REAL_VALUE': 'realValue'}, inplace=True)
  52. df['ableValue'] = df['realValue']
  53. df['forecastAbleValue'] = df[target]
  54. df = df.apply(pd.to_numeric, errors='ignore')
  55. df['C_TIME'] = pd.to_datetime(df['C_TIME'])
  56. acc = calculate_acc(df, opt=opt)
  57. return acc
  58. def date_diff(self, current_dt, repair_dt):
  59. difference = (current_dt - datetime.datetime.strptime(repair_dt, '%Y-%m-%d').date())
  60. return difference.days
  61. def calculate_coe(self, install=False):
  62. # try:
  63. start = time.time()
  64. self.logger.info("检测系统当前的时间为:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start))))
  65. self.opt = self.args.parse_args_and_yaml()
  66. # self.opt.authentication['date'] = time.strftime('%Y-%m-%d', time.localtime(start))
  67. day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  68. coe_start = day_end - pd.Timedelta(days=self.opt.update_coe_days)
  69. day_start_db = coe_start - pd.Timedelta(days=self.opt.repair_days) if install is True else coe_start - pd.Timedelta(days=self.opt.update_add_train_days)
  70. if self.date_diff(day_start_db.date(), self.opt.authentication['full_cap']) < 0:
  71. day_start_db = datetime.datetime.strptime(self.opt.authentication['full_cap'], '%Y-%m-%d')
  72. self.logger.info("更新初始修模起始时间为全容量并网:{}".format(self.opt.authentication['full_cap']))
  73. # db = DataBase(begin=day_start_db, end=day_end, opt=self.opt, logger=self.logger)
  74. # db.data_process()
  75. # self.opt.cap = db.opt.cap
  76. self.args.save_args_yml(self.opt)
  77. formula = Formulas(self.opt)
  78. day = int(time.strftime('%d', time.localtime(start)))
  79. repair, repairs = int(self.opt.repair_model_cycle), []
  80. terval = repair
  81. while repair <= 30:
  82. repairs.append(repair)
  83. repair += terval
  84. if day in repairs or install is True:
  85. # ------------------- 修模 ------------------------
  86. self.repairing_model(day_start_db, coe_start)
  87. self.opt.authentication['repair'] = self.opt.authentication['date']
  88. self.args.save_args_yml(self.opt)
  89. process_name = "forecast_ust"
  90. os.system(f"pkill {process_name}")
  91. self.logger.info("------------进入测试集自动计算-------------")
  92. coe_start += pd.Timedelta(days=1)
  93. nwp, env, dq, rp, rp_his = self.material(coe_start, day_end)
  94. nwp = pd.merge(nwp, dq, on="C_TIME")
  95. if self.opt.full_field is False:
  96. nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']+list(self.opt.zone.keys())]
  97. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
  98. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
  99. _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys()), mean=mean, std=std)
  100. env = pd.merge(env, rp_his, on='C_TIME')
  101. env = env.loc[:, self.opt.env_columns]
  102. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  103. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  104. _, _, env_features = self.normalize(env, mean=mean, std=std)
  105. data_test, env = self.process.get_test_data(nwp_features, env_features)
  106. test_X, data_Y = self.features.get_test_data(data_test, env)
  107. result = self.fmi.predict(test_X, batch_size=8)
  108. # 2.历史数据
  109. for point in range(0, 16, 1):
  110. dfs_point = []
  111. for i, df in enumerate(data_Y):
  112. df[1]["dq_fix"] = result[1][i]
  113. dfs_point.append(df[1].iloc[point])
  114. pre_data = pd.concat(dfs_point, axis=1).T
  115. pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].apply(pd.to_numeric, errors='ignore')
  116. pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME')
  117. pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  118. pre_data['dq_fix'] = pre_data['dq_fix'].round(2)
  119. pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap
  120. pre_data.loc[pre_data['dq_fix'] < 0, 'dq_fix'] = 0
  121. T = 'T' + str(point + 1)
  122. if self.opt.coe[T]['update'] is False:
  123. continue
  124. pre_data['history'] = self.fix.history_error_clock(pre_data, rp_his, 16 - point - 1)
  125. pre_data.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  126. self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1]))
  127. self.update_coe(pre_data, formula, point, test=False)
  128. self.args.save_args_yml(self.opt)
  129. end = time.time()
  130. self.logger.info("定时任务:每天自动确认系数,用了 %s 秒 " % (end - start))
  131. # except Exception as e:
  132. # self.logger.critical("定时任务出错:{}".format(e.args))
  133. def update_coe(self, pre_data, formula, point, test=False):
  134. T = 'T' + str(point + 1)
  135. if test is False:
  136. best, best_coe_m, best_coe_n = 0, 0, 0
  137. dq_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['C_FP_VALUE'].values)
  138. dq_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['dq_fix'].values)
  139. his_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['history'].values)
  140. for i in range(5, 210):
  141. for j in range(5, 210):
  142. pre_data["new"] = round(i / 170 * pre_data['dq_fix'] + j / 170 * pre_data['history'], 3)
  143. acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['new'].values)
  144. if acc > best:
  145. best = acc
  146. best_coe_m = i / 170
  147. best_coe_n = j / 170
  148. self.logger.info(
  149. "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期的准确率:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  150. str(self.opt.update_coe_days), dq_acc, T, best, dq_fix_acc, his_fix_acc))
  151. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  152. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  153. else:
  154. best, best_coe_m, best_coe_n = 0, 0, 0
  155. cdq = pd.read_csv(current_path + '/data/cdq.csv', parse_dates=['C_TIME'])
  156. cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'])
  157. cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(point + 1)]
  158. cdq = pd.merge(cdq, pre_data, on='C_TIME')
  159. cdq['时间'] = cdq['C_TIME'].dt.strftime("%Y-%m-%d")
  160. cc = {
  161. 'formulaType': 'DAY_SHORT_ACCURACY',
  162. 'cap': '95.5',
  163. 'province': 'E52',
  164. 'electricType': 'E1',
  165. 'stationCode': 'J00149'
  166. }
  167. import argparse
  168. config = argparse.Namespace(**cc)
  169. cdqs = cdq.groupby('时间')
  170. dq_accs, dq_fix_accs, cdq_accs, his_accs = [], [], [], []
  171. for dt, group in cdqs:
  172. dq_acc = self.cal_acc(group, 'C_FP_VALUE', config)
  173. dq_fix_acc = self.cal_acc(group, 'dq_fix', config)
  174. cdq_acc = self.cal_acc(group, 'C_ABLE_VALUE', config)
  175. his_acc = self.cal_acc(group, 'history', config)
  176. dq_accs.append(dq_acc)
  177. dq_fix_accs.append(dq_fix_acc)
  178. cdq_accs.append(cdq_acc)
  179. his_accs.append(his_acc)
  180. print(dt, "这一天, 短期准确率:", dq_acc, "超短期公式准确率:", cdq_acc, "神经网络准确率:", dq_fix_acc,
  181. "历史功率准确率:", his_acc)
  182. for i in range(5, 210):
  183. for j in range(5, 210):
  184. cdq["new"] = round(i / 170 * cdq['dq_fix'] + j / 170 * cdq['history'], 2)
  185. acc = formula.calculate_acc_south(cdq['C_REAL_VALUE'].values, cdq['new'].values)
  186. if acc > best:
  187. best = acc
  188. best_coe_m = i / 170
  189. best_coe_n = j / 170
  190. cdq['new'] = round(best_coe_m * cdq['dq_fix'] + best_coe_n * cdq['history'], 2)
  191. cdq.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  192. self.logger.info( "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(str(self.opt.update_coe_days), np.mean(dq_accs), T, best, np.mean(cdq_accs), np.mean(dq_fix_accs), np.mean(his_accs)))
  193. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  194. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  195. def repairing_model(self, day_start_db, day_end):
  196. self.logger.info("------------进入FMI神经网络修模-------------")
  197. nwp, env, dq, rp, rp_his = self.material(day_start_db, day_end, is_repair=True)
  198. nwp = pd.merge(nwp, dq, on="C_TIME")
  199. if self.opt.full_field is False:
  200. nwp = nwp[self.opt.nwp_columns+['C_REAL_VALUE']+list(self.opt.zone.keys())]
  201. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  202. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  203. _, _, nwp_features = self.normalize(nwp, mean=mean, std=std)
  204. env = pd.merge(env, rp_his, on='C_TIME')
  205. env = env.loc[:, self.opt.env_columns]
  206. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  207. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  208. _, _, env_features = self.normalize(env, mean=mean, std=std)
  209. self.opt.nwp_columns = nwp_features.columns.to_list()
  210. self.opt.env_columns = env_features.columns.to_list()
  211. if 'C_REAL_VALUE' in self.opt.nwp_columns:
  212. self.opt.nwp_columns.pop(self.opt.nwp_columns.index('C_REAL_VALUE'))
  213. self.opt.Model["input_size_nwp"] = len(self.opt.nwp_columns) - 1 - len(self.opt.zone.keys())
  214. self.opt.Model["input_size_env"] = len(self.opt.env_columns) - 1
  215. self.update_property()
  216. data_train, env = self.process.get_train_data(nwp_features, env_features)
  217. train_X, valid_X, train_Y, valid_Y = self.features.get_train_data(data_train, env)
  218. # train_Y = [np.array([y[:, 0] for y in train_Y])]
  219. # valid_Y = [np.array([y[:, 0] for y in valid_Y])]
  220. self.fmi.training(self.opt, [train_X, train_Y, valid_X, valid_Y])
  221. def update_property(self):
  222. self.va.set_lp()
  223. self.process.opt = self.opt
  224. self.features.opt = self.opt
  225. self.fix.opt = self.opt
  226. def material(self, begin, end, is_repair=False):
  227. his_begin = (begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)).strftime('%Y-%m-%d %H:%M:%S')
  228. begin, end = begin.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')
  229. if is_repair is True:
  230. self.logger.info("修模的起始时间为:{}-{}".format(begin, end))
  231. else:
  232. self.logger.info("确认系数的起止时间为:{}-{}".format(begin, end))
  233. nwp = pd.read_csv(current_path + '/data/NWP.csv', parse_dates=['C_TIME'])
  234. env = pd.read_csv(current_path + '/data/tower-1-process.csv', parse_dates=['C_TIME'])
  235. rp = pd.read_csv(current_path + '/data/power_filter3.csv', parse_dates=['C_TIME'])
  236. dq = pd.read_csv(current_path + '/data/dq.csv', parse_dates=['C_TIME'])
  237. z_power = pd.read_csv(current_path + '/data/z-power.csv', parse_dates=['C_TIME'])
  238. rp['C_TIME'] = pd.to_datetime(rp['C_TIME'])
  239. rp.set_index('C_TIME', inplace=True)
  240. rp = rp.loc[his_begin: end].reset_index(inplace=False)
  241. if env is None:
  242. raise ValidationError("材料-环境数据为空")
  243. rp = pd.merge(rp, env.loc[:, ['C_TIME'] + [self.opt.usable_power["env"]]], on='C_TIME')
  244. rp_his = rp.copy()
  245. if self.target == 'C_ABLE_VALUE':
  246. rp.drop(['C_REAL_VALUE'], axis=1, inplace=True)
  247. rp.rename(columns={'C_ABLE_VALUE': 'C_REAL_VALUE'}, inplace=True)
  248. rp_his = rp.copy()
  249. else:
  250. plt_name = '-训练集-' + begin + '-' + end if is_repair is True else '-测试集-' + begin + '-' + end
  251. self.lp.weather_power = rp
  252. if self.opt.usable_power["clean_power_which"] == 0:
  253. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  254. rp = rp_signal
  255. elif self.opt.usable_power["clean_power_which"] == 1:
  256. rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
  257. rp = rp_wind
  258. elif self.opt.usable_power["clean_power_which"] == 2:
  259. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  260. rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
  261. time_intersection = set(rp_signal['C_TIME'])
  262. time_intersection.intersection_update(rp_wind['C_TIME'])
  263. rp = rp_wind[rp_wind['C_TIME'].isin(time_intersection)]
  264. elif self.opt.usable_power["clean_power_which"] == 3:
  265. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  266. rp = rp[rp['diff'] < 0.2*self.opt.cap]
  267. elif self.opt.usable_power["clean_power_which"] == 4:
  268. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  269. rp_sample = rp[rp['diff'] < 0.2 * self.opt.cap]
  270. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  271. time_intersection = set(rp_sample['C_TIME'])
  272. time_intersection.intersection_update(rp_signal['C_TIME'])
  273. rp = rp_sample[rp_sample['C_TIME'].isin(time_intersection)]
  274. else:
  275. self.logger.info("不进行限电清洗")
  276. rp = rp.loc[: ,['C_TIME', 'C_REAL_VALUE']]
  277. rp = pd.merge(rp, z_power, on='C_TIME')
  278. # rp['C_REAL_VALUE'] = rp['C_REAL_VALUE'].apply(pd.to_numeric)
  279. rp_his = pd.merge(rp_his.loc[:, ['C_TIME', 'C_ABLE_VALUE']], rp, on='C_TIME', how='left')
  280. rp_his['C_REAL_VALUE'] = rp_his['C_REAL_VALUE'].fillna(rp_his['C_ABLE_VALUE'])
  281. rp_his = pd.merge(rp_his, dq, on='C_TIME')
  282. rp_his = rp_his.loc[:, ['C_TIME', 'C_FP_VALUE', 'C_ABLE_VALUE', 'C_REAL_VALUE']]
  283. # dq = rp_his[rp_his['C_TIME'].isin(rp['C_TIME'])]
  284. dq = pd.merge(rp_his.loc[:, ['C_TIME', 'C_FP_VALUE']], rp.loc[:, ['C_TIME', self.target]+list(self.opt.zone.keys())], on='C_TIME')
  285. nwp.set_index('C_TIME', inplace=True)
  286. nwp = nwp.loc[begin: end].reset_index()
  287. env.set_index('C_TIME', inplace=True)
  288. env = env.loc[his_begin: end].reset_index()
  289. if is_repair:
  290. mean, std = {}, {}
  291. for df in [nwp, env, dq, rp, z_power]:
  292. m, s, _ = self.normalize(df)
  293. mean.update(m)
  294. std.update(s)
  295. self.opt.mean = {k: float(v) for k, v in mean.items()}
  296. self.opt.std = {k: float(v) for k, v in std.items()}
  297. return nwp, env, dq, rp, rp_his
  298. def saveVar(self, path, data):
  299. os.makedirs(os.path.dirname(path), exist_ok=True)
  300. with open(path, 'wb') as file:
  301. pickle.dump(data, file)
  302. def normalize(self, df, drop_list=['C_TIME'], mean=None, std=None):
  303. df1 = df.copy()
  304. drop_index = [list(df1).index(x) for x in drop_list]
  305. df1.drop(drop_list, axis=1, inplace=True, errors='ignore')
  306. df1 = df1.apply(pd.to_numeric, errors='ignore')
  307. if mean is None or std is None:
  308. mean = np.mean(df1, axis=0).round(3) # 数据的均值
  309. std = np.std(df1, axis=0).round(3) # 标准差
  310. new = []
  311. for i, row in df1.iterrows():
  312. d = (row - mean) / std # 归一化
  313. new.append(d.round(3))
  314. if len(new) > 0:
  315. new = pd.concat(new, axis=1).T
  316. for index, col_name in zip(drop_index, drop_list):
  317. new.insert(index, col_name, df[col_name].values)
  318. return mean, std, new
  319. if __name__ == '__main__':
  320. from config import myargparse
  321. from error import dqFix
  322. from process import preData
  323. from detect import Detection
  324. from logs import Log
  325. args = myargparse(discription="场站端配置", add_help=False)
  326. data = preData(args)
  327. detect = Detection(args)
  328. log = Log()
  329. fix = dqFix(args=args)
  330. clock = Clock(log, args, data, detect, fix)
  331. # clock.calculate_coe(cluster=True)
  332. clock.clustering()