clocking.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/11/23 14:07
  4. # file: clocking.py
  5. # author: David
  6. # company: shenyang JY
  7. from xml.dom import ValidationErr
  8. import pandas as pd
  9. import threading
  10. import datetime
  11. from cache.limited_power_curve import LimitPower
  12. from apscheduler.schedulers.background import BackgroundScheduler
  13. import time
  14. import os
  15. import pickle
  16. import numpy as np
  17. from validate import Validation, ValidationError
  18. np.random.seed(42)
  19. from cache.calculate import calculate_acc
  20. from cache.formula import Formulas
  21. from cache.inputData import DataBase
  22. from cache.monitor import Monitor
  23. current_path = os.path.dirname(__file__)
  24. from pytz import timezone
  25. class Clock(object):
  26. def __init__(self, logger, args, va, process, features, fmi, fix):
  27. self.args = args
  28. self.va = va
  29. self.process = process
  30. self.features = features
  31. self.logger = logger
  32. self.opt = self.args.parse_args_and_yaml()
  33. self.mo = Monitor(logger, args)
  34. self.fmi = fmi
  35. self.fix = fix
  36. self.target = self.opt.predict
  37. self.logger.info("---以 {} 进行修模---".format(self.target))
  38. self.lp = LimitPower(self.logger, self.args, None)
  39. def update_thread(self):
  40. thread = threading.Thread(target=self.start_jobs)
  41. thread.start()
  42. def start_jobs(self):
  43. scheduler = BackgroundScheduler()
  44. scheduler.configure({'timezone': timezone("Asia/Shanghai")})
  45. scheduler.add_job(func=self.calculate_coe, trigger="cron", hour=23, minute=0)
  46. scheduler.add_job(func=self.mo.update_config, trigger="interval", seconds=60)
  47. scheduler.start()
  48. def cal_acc(self, df, target, opt):
  49. df = df.copy()
  50. df.rename(columns={'C_REAL_VALUE': 'realValue'}, inplace=True)
  51. df['ableValue'] = df['realValue']
  52. df['forecastAbleValue'] = df[target]
  53. df = df.apply(pd.to_numeric, errors='ignore')
  54. df['C_TIME'] = pd.to_datetime(df['C_TIME'])
  55. acc = calculate_acc(df, opt=opt)
  56. return acc
  57. def date_diff(self, current_dt, repair_dt):
  58. difference = (current_dt - datetime.datetime.strptime(repair_dt, '%Y-%m-%d').date())
  59. return difference.days
  60. def calculate_coe(self, install=False):
  61. # try:
  62. start = time.time()
  63. self.logger.info("检测系统当前的时间为:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start))))
  64. self.opt = self.args.parse_args_and_yaml()
  65. # self.opt.authentication['date'] = time.strftime('%Y-%m-%d', time.localtime(start))
  66. day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  67. coe_start = day_end - pd.Timedelta(days=self.opt.update_coe_days)
  68. day_start_db = coe_start - pd.Timedelta(days=self.opt.repair_days) if install is True else coe_start - pd.Timedelta(days=self.opt.update_add_train_days)
  69. if self.date_diff(day_start_db.date(), self.opt.authentication['full_cap']) < 0:
  70. day_start_db = datetime.datetime.strptime(self.opt.authentication['full_cap'], '%Y-%m-%d')
  71. self.logger.info("更新初始修模起始时间为全容量并网:{}".format(self.opt.authentication['full_cap']))
  72. # db = DataBase(begin=day_start_db, end=day_end, opt=self.opt, logger=self.logger)
  73. # db.data_process()
  74. # self.opt.cap = db.opt.cap
  75. self.args.save_args_yml(self.opt)
  76. formula = Formulas(self.opt)
  77. day = int(time.strftime('%d', time.localtime(start)))
  78. repair, repairs = int(self.opt.repair_model_cycle), []
  79. terval = repair
  80. while repair <= 30:
  81. repairs.append(repair)
  82. repair += terval
  83. if day in repairs or install is True:
  84. # ------------------- 修模 ------------------------
  85. self.repairing_model(day_start_db, coe_start)
  86. self.opt.authentication['repair'] = self.opt.authentication['date']
  87. self.args.save_args_yml(self.opt)
  88. self.logger.info("------------进入测试集自动计算-------------")
  89. coe_start += pd.Timedelta(days=1)
  90. nwp, env, dq, rp, rp_his = self.material(coe_start, day_end)
  91. nwp = pd.merge(nwp, dq, on="C_TIME")
  92. if self.opt.full_field is False:
  93. nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']+list(self.opt.zone.keys())]
  94. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
  95. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys())]
  96. _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE']+list(self.opt.zone.keys()), mean=mean, std=std)
  97. env = pd.merge(env, rp_his, on='C_TIME')
  98. env = env.loc[:, self.opt.env_columns]
  99. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  100. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  101. _, _, env_features = self.normalize(env, mean=mean, std=std)
  102. data_test, env = self.process.get_test_data(nwp_features, env_features)
  103. test_X, data_Y = self.features.get_test_data(data_test, env)
  104. result = self.fmi.predict(test_X, batch_size=8)
  105. # 2.历史数据
  106. for point in range(0, 16, 1):
  107. dfs_point = []
  108. for i, df in enumerate(data_Y):
  109. df[1]["dq_fix"] = result[1][i]
  110. dfs_point.append(df[1].iloc[point])
  111. pre_data = pd.concat(dfs_point, axis=1).T
  112. pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].apply(pd.to_numeric, errors='ignore')
  113. pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME')
  114. pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  115. pre_data['dq_fix'] = pre_data['dq_fix'].round(2)
  116. pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap
  117. pre_data.loc[pre_data['dq_fix'] < 0, 'dq_fix'] = 0
  118. T = 'T' + str(point + 1)
  119. if self.opt.coe[T]['update'] is False:
  120. continue
  121. pre_data['history'] = self.fix.history_error_clock(pre_data, rp_his, 16 - point - 1)
  122. pre_data.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  123. self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1]))
  124. self.update_coe(pre_data, formula, point, test=False)
  125. self.args.save_args_yml(self.opt)
  126. end = time.time()
  127. self.logger.info("定时任务:每天自动确认系数,用了 %s 秒 " % (end - start))
  128. # except Exception as e:
  129. # self.logger.critical("定时任务出错:{}".format(e.args))
  130. def update_coe(self, pre_data, formula, point, test=False):
  131. T = 'T' + str(point + 1)
  132. if test is False:
  133. best, best_coe_m, best_coe_n = 0, 0, 0
  134. dq_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['C_FP_VALUE'].values)
  135. dq_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['dq_fix'].values)
  136. his_fix_acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['history'].values)
  137. for i in range(5, 210):
  138. for j in range(5, 210):
  139. pre_data["new"] = round(i / 170 * pre_data['dq_fix'] + j / 170 * pre_data['history'], 3)
  140. acc = formula.calculate_acc_northwest(pre_data['C_REAL_VALUE'].values, pre_data['new'].values)
  141. if acc > best:
  142. best = acc
  143. best_coe_m = i / 170
  144. best_coe_n = j / 170
  145. self.logger.info(
  146. "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期的准确率:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  147. str(self.opt.update_coe_days), dq_acc, T, best, dq_fix_acc, his_fix_acc))
  148. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  149. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  150. else:
  151. best, best_coe_m, best_coe_n = 0, 0, 0
  152. cdq = pd.read_csv(current_path + '/data/cdq.csv', parse_dates=['C_TIME'])
  153. cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'])
  154. cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(point + 1)]
  155. cdq = pd.merge(cdq, pre_data, on='C_TIME')
  156. cdq['时间'] = cdq['C_TIME'].dt.strftime("%Y-%m-%d")
  157. cc = {
  158. 'formulaType': 'DAY_SHORT_ACCURACY',
  159. 'cap': '95.5',
  160. 'province': 'E52',
  161. 'electricType': 'E1',
  162. 'stationCode': 'J00149'
  163. }
  164. import argparse
  165. config = argparse.Namespace(**cc)
  166. cdqs = cdq.groupby('时间')
  167. dq_accs, dq_fix_accs, cdq_accs, his_accs = [], [], [], []
  168. for dt, group in cdqs:
  169. dq_acc = self.cal_acc(group, 'C_FP_VALUE', config)
  170. dq_fix_acc = self.cal_acc(group, 'dq_fix', config)
  171. cdq_acc = self.cal_acc(group, 'C_ABLE_VALUE', config)
  172. his_acc = self.cal_acc(group, 'history', config)
  173. dq_accs.append(dq_acc)
  174. dq_fix_accs.append(dq_fix_acc)
  175. cdq_accs.append(cdq_acc)
  176. his_accs.append(his_acc)
  177. print(dt, "这一天, 短期准确率:", dq_acc, "超短期公式准确率:", cdq_acc, "神经网络准确率:", dq_fix_acc,
  178. "历史功率准确率:", his_acc)
  179. for i in range(5, 210):
  180. for j in range(5, 210):
  181. cdq["new"] = round(i / 170 * cdq['dq_fix'] + j / 170 * cdq['history'], 2)
  182. acc = formula.calculate_acc_south(cdq['C_REAL_VALUE'].values, cdq['new'].values)
  183. if acc > best:
  184. best = acc
  185. best_coe_m = i / 170
  186. best_coe_n = j / 170
  187. cdq['new'] = round(best_coe_m * cdq['dq_fix'] + best_coe_n * cdq['history'], 2)
  188. cdq.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  189. self.logger.info( "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(str(self.opt.update_coe_days), np.mean(dq_accs), T, best, np.mean(cdq_accs), np.mean(dq_fix_accs), np.mean(his_accs)))
  190. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  191. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  192. def repairing_model(self, day_start_db, day_end):
  193. self.logger.info("------------进入FMI神经网络修模-------------")
  194. nwp, env, dq, rp, rp_his = self.material(day_start_db, day_end, is_repair=True)
  195. nwp = pd.merge(nwp, dq, on="C_TIME")
  196. if self.opt.full_field is False:
  197. nwp = nwp[self.opt.nwp_columns+['C_REAL_VALUE']+list(self.opt.zone.keys())]
  198. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  199. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  200. _, _, nwp_features = self.normalize(nwp, mean=mean, std=std)
  201. env = pd.merge(env, rp_his, on='C_TIME')
  202. env = env.loc[:, self.opt.env_columns]
  203. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  204. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  205. _, _, env_features = self.normalize(env, mean=mean, std=std)
  206. self.opt.nwp_columns = nwp_features.columns.to_list()
  207. self.opt.env_columns = env_features.columns.to_list()
  208. if 'C_REAL_VALUE' in self.opt.nwp_columns:
  209. self.opt.nwp_columns.pop(self.opt.nwp_columns.index('C_REAL_VALUE'))
  210. self.opt.Model["input_size_nwp"] = len(self.opt.nwp_columns) - 1 - len(self.opt.zone.keys())
  211. self.opt.Model["input_size_env"] = len(self.opt.env_columns) - 1
  212. self.update_property()
  213. data_train, env = self.process.get_train_data(nwp_features, env_features)
  214. train_X, valid_X, train_Y, valid_Y = self.features.get_train_data(data_train, env)
  215. # train_Y = [np.array([y[:, 0] for y in train_Y])]
  216. # valid_Y = [np.array([y[:, 0] for y in valid_Y])]
  217. self.fmi.training(self.opt, [train_X, train_Y, valid_X, valid_Y])
  218. def update_property(self):
  219. self.va.set_lp()
  220. self.process.opt = self.opt
  221. self.features.opt = self.opt
  222. self.fix.opt = self.opt
  223. def material(self, begin, end, is_repair=False):
  224. his_begin = (begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)).strftime('%Y-%m-%d %H:%M:%S')
  225. begin, end = begin.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')
  226. if is_repair is True:
  227. self.logger.info("修模的起始时间为:{}-{}".format(begin, end))
  228. else:
  229. self.logger.info("确认系数的起止时间为:{}-{}".format(begin, end))
  230. nwp = pd.read_csv(current_path + '/data/NWP.csv', parse_dates=['C_TIME'])
  231. env = pd.read_csv(current_path + '/data/tower-1-process.csv', parse_dates=['C_TIME'])
  232. rp = pd.read_csv(current_path + '/data/power_filter3.csv', parse_dates=['C_TIME'])
  233. dq = pd.read_csv(current_path + '/data/dq.csv', parse_dates=['C_TIME'])
  234. z_power = pd.read_csv(current_path + '/data/z-power.csv', parse_dates=['C_TIME'])
  235. rp['C_TIME'] = pd.to_datetime(rp['C_TIME'])
  236. rp.set_index('C_TIME', inplace=True)
  237. rp = rp.loc[his_begin: end].reset_index(inplace=False)
  238. if env is None:
  239. raise ValidationError("材料-环境数据为空")
  240. rp = pd.merge(rp, env.loc[:, ['C_TIME'] + [self.opt.usable_power["env"]]], on='C_TIME')
  241. rp_his = rp.copy()
  242. if self.target == 'C_ABLE_VALUE':
  243. rp.drop(['C_REAL_VALUE'], axis=1, inplace=True)
  244. rp.rename(columns={'C_ABLE_VALUE': 'C_REAL_VALUE'}, inplace=True)
  245. rp_his = rp.copy()
  246. else:
  247. plt_name = '-训练集-' + begin + '-' + end if is_repair is True else '-测试集-' + begin + '-' + end
  248. self.lp.weather_power = rp
  249. if self.opt.usable_power["clean_power_which"] == 0:
  250. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  251. rp = rp_signal
  252. elif self.opt.usable_power["clean_power_which"] == 1:
  253. rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
  254. rp = rp_wind
  255. elif self.opt.usable_power["clean_power_which"] == 2:
  256. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  257. rp_wind = self.lp.clean_limited_power(plt_name, cluster=is_repair)
  258. time_intersection = set(rp_signal['C_TIME'])
  259. time_intersection.intersection_update(rp_wind['C_TIME'])
  260. rp = rp_wind[rp_wind['C_TIME'].isin(time_intersection)]
  261. elif self.opt.usable_power["clean_power_which"] == 3:
  262. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  263. rp = rp[rp['diff'] < 0.2*self.opt.cap]
  264. elif self.opt.usable_power["clean_power_which"] == 4:
  265. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  266. rp_sample = rp[rp['diff'] < 0.2 * self.opt.cap]
  267. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  268. time_intersection = set(rp_sample['C_TIME'])
  269. time_intersection.intersection_update(rp_signal['C_TIME'])
  270. rp = rp_sample[rp_sample['C_TIME'].isin(time_intersection)]
  271. else:
  272. self.logger.info("不进行限电清洗")
  273. rp = rp.loc[: ,['C_TIME', 'C_REAL_VALUE']]
  274. rp = pd.merge(rp, z_power, on='C_TIME')
  275. # rp['C_REAL_VALUE'] = rp['C_REAL_VALUE'].apply(pd.to_numeric)
  276. rp_his = pd.merge(rp_his.loc[:, ['C_TIME', 'C_ABLE_VALUE']], rp, on='C_TIME', how='left')
  277. rp_his['C_REAL_VALUE'] = rp_his['C_REAL_VALUE'].fillna(rp_his['C_ABLE_VALUE'])
  278. rp_his = pd.merge(rp_his, dq, on='C_TIME')
  279. rp_his = rp_his.loc[:, ['C_TIME', 'C_FP_VALUE', 'C_ABLE_VALUE', 'C_REAL_VALUE']]
  280. # dq = rp_his[rp_his['C_TIME'].isin(rp['C_TIME'])]
  281. dq = pd.merge(rp_his.loc[:, ['C_TIME', 'C_FP_VALUE']], rp.loc[:, ['C_TIME', self.target]+list(self.opt.zone.keys())], on='C_TIME')
  282. nwp.set_index('C_TIME', inplace=True)
  283. nwp = nwp.loc[begin: end].reset_index()
  284. env.set_index('C_TIME', inplace=True)
  285. env = env.loc[his_begin: end].reset_index()
  286. if is_repair:
  287. mean, std = {}, {}
  288. for df in [nwp, env, dq, rp, z_power]:
  289. m, s, _ = self.normalize(df)
  290. mean.update(m)
  291. std.update(s)
  292. self.opt.mean = {k: float(v) for k, v in mean.items()}
  293. self.opt.std = {k: float(v) for k, v in std.items()}
  294. return nwp, env, dq, rp, rp_his
  295. def saveVar(self, path, data):
  296. os.makedirs(os.path.dirname(path), exist_ok=True)
  297. with open(path, 'wb') as file:
  298. pickle.dump(data, file)
  299. def normalize(self, df, drop_list=['C_TIME'], mean=None, std=None):
  300. df1 = df.copy()
  301. drop_index = [list(df1).index(x) for x in drop_list]
  302. df1.drop(drop_list, axis=1, inplace=True, errors='ignore')
  303. df1 = df1.apply(pd.to_numeric, errors='ignore')
  304. if mean is None or std is None:
  305. mean = np.mean(df1, axis=0).round(3) # 数据的均值
  306. std = np.std(df1, axis=0).round(3) # 标准差
  307. new = []
  308. for i, row in df1.iterrows():
  309. d = (row - mean) / std # 归一化
  310. new.append(d.round(3))
  311. if len(new) > 0:
  312. new = pd.concat(new, axis=1).T
  313. for index, col_name in zip(drop_index, drop_list):
  314. new.insert(index, col_name, df[col_name].values)
  315. return mean, std, new
  316. if __name__ == '__main__':
  317. from config import myargparse
  318. from error import dqFix
  319. from process import preData
  320. from detect import Detection
  321. from logs import Log
  322. args = myargparse(discription="场站端配置", add_help=False)
  323. data = preData(args)
  324. detect = Detection(args)
  325. log = Log()
  326. fix = dqFix(args=args)
  327. clock = Clock(log, args, data, detect, fix)
  328. # clock.calculate_coe(cluster=True)
  329. clock.clustering()