clocking.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/11/23 14:07
  4. # file: clocking.py
  5. # author: David
  6. # company: shenyang JY
  7. import sys
  8. from xml.dom import ValidationErr
  9. import pandas as pd
  10. import threading
  11. import datetime
  12. from cache.limited_power_curve import LimitPower
  13. from apscheduler.schedulers.background import BackgroundScheduler
  14. import time
  15. import os
  16. import pickle
  17. import numpy as np
  18. from validate import Validation, ValidationError
  19. np.random.seed(42)
  20. from cache.calculate import calculate_acc
  21. from cache.formula import Formulas
  22. from cache.inputData import DataBase
  23. from cache.monitor import Monitor
  24. current_path = os.path.dirname(__file__)
  25. from pytz import timezone
  26. class Clock(object):
  27. def __init__(self, logger, args, va, process, features, fmi, fix):
  28. self.args = args
  29. self.va = va
  30. self.process = process
  31. self.features = features
  32. self.logger = logger
  33. self.opt = self.args.parse_args_and_yaml()
  34. self.mo = Monitor(logger, args)
  35. self.fmi = fmi
  36. self.fix = fix
  37. self.target = self.opt.predict
  38. self.logger.info("---以 {} 进行修模---".format(self.target))
  39. self.lp = LimitPower(self.logger, self.args, None)
  40. def update_thread(self):
  41. thread = threading.Thread(target=self.start_jobs)
  42. thread.start()
  43. def start_jobs(self):
  44. scheduler = BackgroundScheduler()
  45. scheduler.configure({'timezone': timezone("Asia/Shanghai")})
  46. scheduler.add_job(func=self.calculate_coe, trigger="cron", hour=23, minute=0)
  47. scheduler.add_job(func=self.mo.update_config, trigger="interval", seconds=60)
  48. scheduler.start()
  49. def cal_acc(self, df, target, opt):
  50. df = df.copy()
  51. df.rename(columns={'C_REAL_VALUE': 'realValue'}, inplace=True)
  52. df['ableValue'] = df['realValue']
  53. df['forecastAbleValue'] = df[target]
  54. df = df.apply(pd.to_numeric, errors='ignore')
  55. df['C_TIME'] = pd.to_datetime(df['C_TIME'])
  56. acc = calculate_acc(df, opt=opt)
  57. return acc
  58. def date_diff(self, current_dt, repair_dt):
  59. difference = (current_dt - datetime.datetime.strptime(repair_dt, '%Y-%m-%d').date())
  60. return difference.days
  61. def calculate_coe(self, install=False):
  62. # try:
  63. start = time.time()
  64. self.logger.info("检测系统当前的时间为:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start))))
  65. self.opt = self.args.parse_args_and_yaml()
  66. # self.opt.authentication['date'] = time.strftime('%Y-%m-%d', time.localtime(start))
  67. day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  68. coe_start = day_end - pd.Timedelta(days=self.opt.update_coe_days)
  69. day_start_db = coe_start - pd.Timedelta(days=self.opt.repair_days) if install is True else coe_start - pd.Timedelta(days=self.opt.update_add_train_days)
  70. if self.date_diff(day_start_db.date(), self.opt.authentication['full_cap']) < 0:
  71. day_start_db = datetime.datetime.strptime(self.opt.authentication['full_cap'], '%Y-%m-%d')
  72. self.logger.info("更新初始修模起始时间为全容量并网:{}".format(self.opt.authentication['full_cap']))
  73. # db = DataBase(begin=day_start_db, end=day_end, opt=self.opt, logger=self.logger)
  74. # db.data_process()
  75. # self.opt.cap = db.opt.cap
  76. self.args.save_args_yml(self.opt)
  77. formula = Formulas(self.opt)
  78. day = int(time.strftime('%d', time.localtime(start)))
  79. repair, repairs = int(self.opt.repair_model_cycle), []
  80. terval = repair
  81. while repair <= 30:
  82. repairs.append(repair)
  83. repair += terval
  84. if day in repairs or install is True:
  85. # ------------------- 修模 ------------------------
  86. self.repairing_model(day_start_db, coe_start)
  87. self.opt.authentication['repair'] = self.opt.authentication['date']
  88. self.args.save_args_yml(self.opt)
  89. self.logger.info("------------进入测试集自动计算-------------")
  90. coe_start += pd.Timedelta(days=1)
  91. nwp, env, dq, rp, rp_his = self.material(coe_start, day_end)
  92. nwp = pd.merge(nwp, dq, on="C_TIME")
  93. if self.opt.full_field is False:
  94. nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']+list(self.opt.zone.keys())]
  95. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
  96. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
  97. _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys()), mean=mean, std=std)
  98. env = pd.merge(env, rp_his, on='C_TIME')
  99. env = env.loc[:, self.opt.env_columns]
  100. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  101. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  102. _, _, env_features = self.normalize(env, mean=mean, std=std)
  103. data_test, env = self.process.get_test_data(nwp_features, env_features)
  104. test_X, data_Y = self.features.get_test_data(data_test, env)
  105. result = self.fmi.predict(test_X, batch_size=8)
  106. # 2.历史数据
  107. for point in range(0, 16, 1):
  108. dfs_point = []
  109. for i, df in enumerate(data_Y):
  110. df[1]["dq_fix"] = result[1][i]
  111. dfs_point.append(df[1].iloc[point])
  112. pre_data = pd.concat(dfs_point, axis=1).T
  113. pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].apply(pd.to_numeric, errors='ignore')
  114. pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME')
  115. pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  116. pre_data['dq_fix'] = pre_data['dq_fix'].round(2)
  117. pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap
  118. pre_data.loc[pre_data['dq_fix'] < 0, 'dq_fix'] = 0
  119. T = 'T' + str(point + 1)
  120. if self.opt.coe[T]['update'] is False:
  121. continue
  122. pre_data['history'] = self.fix.history_error_clock(pre_data, rp_his, 16 - point - 1)
  123. pre_data.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  124. self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1]))
  125. self.update_coe(pre_data, formula, point, test=False)
  126. self.args.save_args_yml(self.opt)
  127. end = time.time()
  128. self.logger.info("定时任务:每天自动确认系数,用了 %s 秒 " % (end - start))
  129. # except Exception as e:
  130. # self.logger.critical("定时任务出错:{}".format(e.args))
  131. def update_coe(self, pre_data, formula, point, test=False):
  132. T = 'T' + str(point + 1)
  133. if test is False:
  134. best, best_coe_m, best_coe_n = 0, 0, 0
  135. dq_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['C_FP_VALUE'].values)
  136. dq_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['dq_fix'].values)
  137. his_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['history'].values)
  138. for i in range(5, 210):
  139. for j in range(5, 210):
  140. pre_data["new"] = round(i / 170 * pre_data['dq_fix'] + j / 170 * pre_data['history'], 3)
  141. acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['new'].values)
  142. if acc > best:
  143. best = acc
  144. best_coe_m = i / 170
  145. best_coe_n = j / 170
  146. self.logger.info(
  147. "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期的准确率:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  148. str(self.opt.update_coe_days), dq_acc, T, best, dq_fix_acc, his_fix_acc))
  149. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  150. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  151. else:
  152. best, best_coe_m, best_coe_n = 0, 0, 0
  153. cdq = pd.read_csv(current_path + '/data/cdq.csv', parse_dates=['C_TIME'])
  154. cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'])
  155. cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(point + 1)]
  156. cdq = pd.merge(cdq, pre_data, on='C_TIME')
  157. cdq['时间'] = cdq['C_TIME'].dt.strftime("%Y-%m-%d")
  158. cc = {
  159. 'formulaType': 'DAY_SHORT_ACCURACY',
  160. 'cap': '95.5',
  161. 'province': 'E52',
  162. 'electricType': 'E1',
  163. 'stationCode': 'J00149'
  164. }
  165. import argparse
  166. config = argparse.Namespace(**cc)
  167. cdqs = cdq.groupby('时间')
  168. dq_accs, dq_fix_accs, cdq_accs, his_accs = [], [], [], []
  169. for dt, group in cdqs:
  170. dq_acc = self.cal_acc(group, 'C_FP_VALUE', config)
  171. dq_fix_acc = self.cal_acc(group, 'dq_fix', config)
  172. cdq_acc = self.cal_acc(group, 'C_ABLE_VALUE', config)
  173. his_acc = self.cal_acc(group, 'history', config)
  174. dq_accs.append(dq_acc)
  175. dq_fix_accs.append(dq_fix_acc)
  176. cdq_accs.append(cdq_acc)
  177. his_accs.append(his_acc)
  178. print(dt, "这一天, 短期准确率:", dq_acc, "超短期公式准确率:", cdq_acc, "神经网络准确率:", dq_fix_acc,
  179. "历史功率准确率:", his_acc)
  180. for i in range(5, 210):
  181. for j in range(5, 210):
  182. cdq["new"] = round(i / 170 * cdq['dq_fix'] + j / 170 * cdq['history'], 2)
  183. acc = formula.calculate_acc_south(cdq['C_REAL_VALUE'].values, cdq['new'].values)
  184. if acc > best:
  185. best = acc
  186. best_coe_m = i / 170
  187. best_coe_n = j / 170
  188. cdq['new'] = round(best_coe_m * cdq['dq_fix'] + best_coe_n * cdq['history'], 2)
  189. cdq.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  190. self.logger.info( "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(str(self.opt.update_coe_days), np.mean(dq_accs), T, best, np.mean(cdq_accs), np.mean(dq_fix_accs), np.mean(his_accs)))
  191. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  192. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  193. def repairing_model(self, day_start_db, day_end):
  194. self.logger.info("------------进入FMI神经网络修模-------------")
  195. nwp, env, dq, rp, rp_his = self.material(day_start_db, day_end, is_repair=True)
  196. nwp = pd.merge(nwp, dq, on="C_TIME")
  197. if self.opt.full_field is False:
  198. nwp = nwp[self.opt.nwp_columns+['C_REAL_VALUE']+list(self.opt.zone.keys())]
  199. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  200. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  201. _, _, nwp_features = self.normalize(nwp, mean=mean, std=std)
  202. env = pd.merge(env, rp_his, on='C_TIME')
  203. env = env.loc[:, self.opt.env_columns]
  204. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  205. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  206. _, _, env_features = self.normalize(env, mean=mean, std=std)
  207. self.opt.nwp_columns = nwp_features.columns.to_list()
  208. self.opt.env_columns = env_features.columns.to_list()
  209. if 'C_REAL_VALUE' in self.opt.nwp_columns:
  210. self.opt.nwp_columns.pop(self.opt.nwp_columns.index('C_REAL_VALUE'))
  211. self.opt.Model["input_size_nwp"] = len(self.opt.nwp_columns) - 1 - len(self.opt.zone.keys())
  212. self.opt.Model["input_size_env"] = len(self.opt.env_columns) - 1
  213. self.update_property()
  214. data_train, env = self.process.get_train_data(nwp_features, env_features)
  215. train_X, valid_X, train_Y, valid_Y = self.features.get_train_data(data_train, env)
  216. # train_Y = [np.array([y[:, 0] for y in train_Y])]
  217. # valid_Y = [np.array([y[:, 0] for y in valid_Y])]
  218. self.fmi.training(self.opt, [train_X, train_Y, valid_X, valid_Y])
  219. def update_property(self):
  220. self.va.set_lp()
  221. self.process.opt = self.opt
  222. self.features.opt = self.opt
  223. self.fix.opt = self.opt
  224. def material(self, begin, end, is_repair=False):
  225. his_begin = (begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)).strftime('%Y-%m-%d %H:%M:%S')
  226. begin, end = begin.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')
  227. if is_repair is True:
  228. self.logger.info("修模的起始时间为:{}-{}".format(begin, end))
  229. else:
  230. self.logger.info("确认系数的起止时间为:{}-{}".format(begin, end))
  231. nwp = pd.read_csv(current_path + '/data/NWP.csv', parse_dates=['C_TIME'])
  232. env = pd.read_csv(current_path + '/data/tower-1-process.csv', parse_dates=['C_TIME'])
  233. rp = pd.read_csv(current_path + '/data/power_filter3.csv', parse_dates=['C_TIME'])
  234. dq = pd.read_csv(current_path + '/data/dq.csv', parse_dates=['C_TIME'])
  235. z_power = pd.read_csv(current_path + '/data/z-power.csv', parse_dates=['C_TIME'])
  236. rp['C_TIME'] = pd.to_datetime(rp['C_TIME'])
  237. rp.set_index('C_TIME', inplace=True)
  238. rp = rp.loc[his_begin: end].reset_index(inplace=False)
  239. if env is None:
  240. raise ValidationError("材料-环境数据为空")
  241. rp = pd.merge(rp, env.loc[:, ['C_TIME'] + [self.opt.usable_power["env"]]], on='C_TIME')
  242. rp_his = rp.copy()
  243. if self.target == 'C_ABLE_VALUE':
  244. rp.drop(['C_REAL_VALUE'], axis=1, inplace=True)
  245. rp.rename(columns={'C_ABLE_VALUE': 'C_REAL_VALUE'}, inplace=True)
  246. rp_his = rp.copy()
  247. else:
  248. plt_name = '-训练集-' + begin + '-' + end if is_repair is True else '-测试集-' + begin + '-' + end
  249. self.lp.weather_power = rp
  250. if self.opt.usable_power["clean_power_which"] == 0:
  251. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  252. rp = rp_signal
  253. elif self.opt.usable_power["clean_power_which"] == 1:
  254. rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
  255. rp = rp_wind
  256. elif self.opt.usable_power["clean_power_which"] == 2:
  257. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  258. rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
  259. time_intersection = set(rp_signal['C_TIME'])
  260. time_intersection.intersection_update(rp_wind['C_TIME'])
  261. rp = rp_wind[rp_wind['C_TIME'].isin(time_intersection)]
  262. elif self.opt.usable_power["clean_power_which"] == 3:
  263. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  264. rp = rp[rp['diff'] < 0.2*self.opt.cap]
  265. elif self.opt.usable_power["clean_power_which"] == 4:
  266. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  267. rp_sample = rp[rp['diff'] < 0.2 * self.opt.cap]
  268. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  269. time_intersection = set(rp_sample['C_TIME'])
  270. time_intersection.intersection_update(rp_signal['C_TIME'])
  271. rp = rp_sample[rp_sample['C_TIME'].isin(time_intersection)]
  272. else:
  273. self.logger.info("不进行限电清洗")
  274. rp = rp.loc[: ,['C_TIME', 'C_REAL_VALUE']]
  275. rp = pd.merge(rp, z_power, on='C_TIME')
  276. # rp['C_REAL_VALUE'] = rp['C_REAL_VALUE'].apply(pd.to_numeric)
  277. rp_his = pd.merge(rp_his.loc[:, ['C_TIME', 'C_ABLE_VALUE']], rp, on='C_TIME', how='left')
  278. rp_his['C_REAL_VALUE'] = rp_his['C_REAL_VALUE'].fillna(rp_his['C_ABLE_VALUE'])
  279. rp_his = pd.merge(rp_his, dq, on='C_TIME')
  280. rp_his = rp_his.loc[:, ['C_TIME', 'C_FP_VALUE', 'C_ABLE_VALUE', 'C_REAL_VALUE']]
  281. # dq = rp_his[rp_his['C_TIME'].isin(rp['C_TIME'])]
  282. dq = pd.merge(rp_his.loc[:, ['C_TIME', 'C_FP_VALUE']], rp.loc[:, ['C_TIME', self.target]+list(self.opt.zone.keys())], on='C_TIME')
  283. nwp.set_index('C_TIME', inplace=True)
  284. nwp = nwp.loc[begin: end].reset_index()
  285. env.set_index('C_TIME', inplace=True)
  286. env = env.loc[his_begin: end].reset_index()
  287. if is_repair:
  288. mean, std = {}, {}
  289. for df in [nwp, env, dq, rp, z_power]:
  290. m, s, _ = self.normalize(df)
  291. mean.update(m)
  292. std.update(s)
  293. self.opt.mean = {k: float(v) for k, v in mean.items()}
  294. self.opt.std = {k: float(v) for k, v in std.items()}
  295. return nwp, env, dq, rp, rp_his
  296. def saveVar(self, path, data):
  297. os.makedirs(os.path.dirname(path), exist_ok=True)
  298. with open(path, 'wb') as file:
  299. pickle.dump(data, file)
  300. def normalize(self, df, drop_list=['C_TIME'], mean=None, std=None):
  301. df1 = df.copy()
  302. drop_index = [list(df1).index(x) for x in drop_list]
  303. df1.drop(drop_list, axis=1, inplace=True, errors='ignore')
  304. df1 = df1.apply(pd.to_numeric, errors='ignore')
  305. if mean is None or std is None:
  306. mean = np.mean(df1, axis=0).round(3) # 数据的均值
  307. std = np.std(df1, axis=0).round(3) # 标准差
  308. new = []
  309. for i, row in df1.iterrows():
  310. d = (row - mean) / std # 归一化
  311. new.append(d.round(3))
  312. if len(new) > 0:
  313. new = pd.concat(new, axis=1).T
  314. for index, col_name in zip(drop_index, drop_list):
  315. new.insert(index, col_name, df[col_name].values)
  316. return mean, std, new
  317. if __name__ == '__main__':
  318. from config import myargparse
  319. from error import dqFix
  320. from process import preData
  321. from detect import Detection
  322. from logs import Log
  323. args = myargparse(discription="场站端配置", add_help=False)
  324. data = preData(args)
  325. detect = Detection(args)
  326. log = Log()
  327. fix = dqFix(args=args)
  328. clock = Clock(log, args, data, detect, fix)
  329. # clock.calculate_coe(cluster=True)
  330. clock.clustering()