clocking_acc.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/11/23 14:07
  4. # file: clocking.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import threading
  9. import datetime
  10. from cache.limited_power import LimitPower
  11. from apscheduler.schedulers.background import BackgroundScheduler
  12. import time
  13. import os
  14. import pickle
  15. import numpy as np
  16. np.random.seed(42)
  17. from cache.calculate import calculate_acc
  18. from cache.formula import Formulas
  19. from cache.inputData import DataBase
  20. from cache.monitor import Monitor
  21. current_path = os.path.dirname(__file__)
  22. from pytz import timezone
  23. class Clock(object):
  24. def __init__(self, logger, args, va, process, features, fmi, fix):
  25. self.args = args
  26. self.va = va
  27. self.process = process
  28. self.features = features
  29. self.logger = logger
  30. self.opt = self.args.parse_args_and_yaml()
  31. self.mo = Monitor(logger, args)
  32. self.fmi = fmi
  33. self.fix = fix
  34. self.target = self.opt.predict
  35. self.logger.info("---以 {} 进行修模---".format(self.target))
  36. def update_thread(self):
  37. thread = threading.Thread(target=self.start_jobs)
  38. thread.start()
  39. def start_jobs(self):
  40. scheduler = BackgroundScheduler()
  41. scheduler.configure({'timezone': timezone("Asia/Shanghai")})
  42. scheduler.add_job(func=self.calculate_coe, trigger="cron", hour=23, minute=0)
  43. scheduler.add_job(func=self.mo.update_config, trigger="interval", seconds=60)
  44. scheduler.start()
  45. def cal_acc(self, df, target, opt):
  46. df = df.copy()
  47. df.rename(columns={'C_REAL_VALUE': 'realValue'}, inplace=True)
  48. df['ableValue'] = df['realValue']
  49. df['forecastAbleValue'] = df[target]
  50. df = df.apply(pd.to_numeric, errors='ignore')
  51. df['C_TIME'] = pd.to_datetime(df['C_TIME'])
  52. acc = calculate_acc(df, opt=opt)
  53. return acc
  54. def date_diff(self, current_dt, repair_dt):
  55. difference = (current_dt - datetime.datetime.strptime(repair_dt, '%Y-%m-%d').date())
  56. return difference.days
  57. def calculate_coe(self, install=False):
  58. try:
  59. start = time.time()
  60. self.logger.info("检测系统当前的时间为:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start))))
  61. self.opt = self.args.parse_args_and_yaml()
  62. self.opt.authentication['date'] = time.strftime('%Y-%m-%d', time.localtime(start))
  63. day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  64. coe_start = day_end - pd.Timedelta(days=self.opt.update_coe_days)
  65. day_start_db = coe_start - pd.Timedelta(days=self.opt.repair_days) if install is True else coe_start - pd.Timedelta(days=self.opt.update_add_train_days)
  66. if self.date_diff(day_start_db.date(), self.opt.authentication['full_cap']) < 0:
  67. day_start_db = datetime.datetime.strptime(self.opt.authentication['full_cap'], '%Y-%m-%d')
  68. self.logger.info("更新初始修模起始时间为全容量并网:{}".format(self.opt.authentication['full_cap']))
  69. db = DataBase(begin=day_start_db, end=day_end, opt=self.opt, logger=self.logger)
  70. db.data_process()
  71. self.opt.cap = db.opt.cap
  72. self.args.save_args_yml(self.opt)
  73. formula = Formulas(self.opt)
  74. day = int(time.strftime('%d', time.localtime(start)))
  75. repair, repairs = int(self.opt.repair_model_cycle), []
  76. terval = repair
  77. while repair <= 30:
  78. repairs.append(repair)
  79. repair += terval
  80. if day in repairs or install is True:
  81. # ------------------- 修模 ------------------------
  82. self.repairing_model(day_start_db, coe_start)
  83. self.opt.authentication['repair'] = self.opt.authentication['date']
  84. self.args.save_args_yml(self.opt)
  85. self.logger.info("------------进入测试集自动计算-------------")
  86. coe_start += pd.Timedelta(days=1)
  87. nwp, env, dq, rp = self.material(coe_start, day_end)
  88. nwp = pd.merge(nwp, dq, on="C_TIME")
  89. if self.opt.full_field is False:
  90. nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']]
  91. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']]
  92. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']]
  93. _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE'], mean=mean, std=std)
  94. env = pd.merge(env, dq, on='C_TIME')
  95. env = env.loc[:, self.opt.env_columns]
  96. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  97. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  98. _, _, env_features = self.normalize(env, mean=mean, std=std)
  99. data_test, env = self.process.get_test_data(nwp_features, env_features)
  100. test_X, test_Y, data_Y = self.features.get_test_data(data_test, env)
  101. result = self.fmi.predict(test_X, batch_size=8)
  102. # 2.历史数据
  103. for point in range(0, 16, 1):
  104. dfs_point = []
  105. for i, df in enumerate(data_Y):
  106. df["dq_fix"] = result[i]
  107. dfs_point.append(df.iloc[point])
  108. pre_data = pd.concat(dfs_point, axis=1).T
  109. pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME')
  110. pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  111. pre_data.loc[pre_data['C_FP_VALUE'].values <= 0, 'dq_fix'] = 0
  112. pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap
  113. pre_data = pre_data.apply(pd.to_numeric, errors='ignore')
  114. pre_data['C_TIME'] = pd.to_datetime(pre_data['C_TIME'])
  115. pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].round(2)
  116. T = 'T' + str(point + 1)
  117. if self.opt.coe[T]['update'] is False:
  118. continue
  119. pre_data['history'] = self.fix.history_error_clock(pre_data, dq, 16 - point)
  120. self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1]))
  121. best, best_coe_m, best_coe_n = 0, 0, 0
  122. cdq = pd.read_csv(current_path + '/data/cdq.csv', parse_dates=['C_TIME'])
  123. cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'])
  124. cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(point + 1)]
  125. cdq = pd.merge(cdq, pre_data, on='C_TIME')
  126. cdq['时间'] = cdq['C_TIME'].dt.strftime("%Y-%m-%d")
  127. cc = {
  128. 'formulaType': 'DAY_SHORT_ACCURACY',
  129. 'cap': '100',
  130. 'province': 'E46',
  131. 'electricType': 'E1',
  132. 'stationCode': 'J00629'
  133. }
  134. import argparse
  135. config = argparse.Namespace(**cc)
  136. cdqs = cdq.groupby('时间')
  137. dq_accs, dq_fix_accs, cdq_accs, his_accs = [], [], [], []
  138. for dt, group in cdqs:
  139. dq_acc = self.cal_acc(group, 'C_FP_VALUE', config)
  140. dq_fix_acc = self.cal_acc(group, 'dq_fix', config)
  141. cdq_acc = self.cal_acc(group, 'C_ABLE_VALUE', config)
  142. his_acc = self.cal_acc(group, 'history', config)
  143. dq_accs.append(dq_acc)
  144. dq_fix_accs.append(dq_fix_acc)
  145. cdq_accs.append(cdq_acc)
  146. his_accs.append(his_acc)
  147. print(dt, "这一天, 短期准确率:", dq_acc, "超短期公式准确率:", cdq_acc, "神经网络准确率:", dq_fix_acc, "历史功率准确率:", his_acc)
  148. for i in range(5, 210):
  149. for j in range(5, 210):
  150. cdq["new"] = round(i / 170 * cdq['dq_fix'] + j / 170 * cdq['history'], 2)
  151. acc = formula.calculate_acc_south(cdq['C_REAL_VALUE'].values, cdq['new'].values)
  152. if acc > best:
  153. best = acc
  154. best_coe_m = i / 170
  155. best_coe_n = j / 170
  156. cdq['new'] = round(best_coe_m * cdq['dq_fix'] + best_coe_n * cdq['history'], 2)
  157. cdq.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  158. self.logger.info(
  159. "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  160. str(self.opt.update_coe_days), np.mean(dq_accs), T, best, np.mean(cdq_accs),
  161. np.mean(dq_fix_accs), np.mean(his_accs)))
  162. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  163. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  164. self.args.save_args_yml(self.opt)
  165. end = time.time()
  166. self.logger.info("定时任务:每天自动确认系数,用了 %s 秒 " % (end - start))
  167. except Exception as e:
  168. self.logger.critical("定时任务出错:{}".format(e.args))
  169. def repairing_model(self, day_start_db, day_end):
  170. self.logger.info("------------进入FMI神经网络修模-------------")
  171. nwp, env, dq, rp = self.material(day_start_db, day_end, is_repair=True)
  172. nwp = pd.merge(nwp, dq, on="C_TIME")
  173. if self.opt.full_field is False:
  174. nwp = nwp[self.opt.nwp_columns+['C_REAL_VALUE']]
  175. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  176. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  177. _, _, nwp_features = self.normalize(nwp, mean=mean, std=std)
  178. env = pd.merge(env, dq, on='C_TIME')
  179. env = env.loc[:, self.opt.env_columns]
  180. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  181. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  182. _, _, env_features = self.normalize(env, mean=mean, std=std)
  183. self.opt.nwp_columns = nwp_features.columns.to_list()
  184. self.opt.env_columns = env_features.columns.to_list()
  185. if 'C_REAL_VALUE' in self.opt.nwp_columns:
  186. self.opt.nwp_columns.pop(self.opt.nwp_columns.index('C_REAL_VALUE'))
  187. self.opt.Model["input_size_nwp"] = len(self.opt.nwp_columns) - 1
  188. self.opt.Model["input_size_env"] = len(self.opt.env_columns) - 1
  189. self.update_property()
  190. data_train, env = self.process.get_train_data(nwp_features, env_features)
  191. train_X, valid_X, train_Y, valid_Y = self.features.get_train_data(data_train, env)
  192. # train_Y = [np.array([y[:, 0] for y in train_Y])]
  193. # valid_Y = [np.array([y[:, 0] for y in valid_Y])]
  194. self.fmi.training(self.opt, [train_X, train_Y, valid_X, valid_Y])
  195. def update_property(self):
  196. self.va.set_lp()
  197. self.process.opt = self.opt
  198. self.features.opt = self.opt
  199. self.fix.opt = self.opt
  200. def material(self, begin, end, is_repair=False):
  201. his_begin = (begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)).strftime('%Y-%m-%d %H:%M:%S')
  202. begin, end = begin.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')
  203. if is_repair is True:
  204. self.logger.info("修模的起始时间为:{}-{}".format(begin, end))
  205. else:
  206. self.logger.info("确认系数的起止时间为:{}-{}".format(begin, end))
  207. plt_name = '-训练集-'+begin + '-' + end if is_repair is True else '-测试集-'+begin + '-' + end
  208. nwp = pd.read_csv(current_path + '/data/NWP.csv', parse_dates=['C_TIME'])
  209. env = pd.read_csv(current_path + '/data/tower-1-process.csv', parse_dates=['C_TIME'])
  210. rp = pd.read_csv(current_path + '/data/power.csv', parse_dates=['C_TIME'])
  211. dq = pd.read_csv(current_path + '/data/dq.csv', parse_dates=['C_TIME'])
  212. rp['C_TIME'] = pd.to_datetime(rp['C_TIME'])
  213. rp.set_index('C_TIME', inplace=True)
  214. rp = rp.loc[his_begin: end].reset_index(inplace=False)
  215. if self.target == 'C_ABLE_VALUE':
  216. rp.drop(['C_REAL_VALUE'], axis=1, inplace=True)
  217. rp.rename(columns={'C_ABLE_VALUE': 'C_REAL_VALUE'}, inplace=True)
  218. else:
  219. rp = pd.merge(rp, env.loc[:, ['C_TIME'] + [self.opt.usable_power["env"]]], on='C_TIME')
  220. lp = LimitPower(self.logger, self.args, rp)
  221. if self.opt.usable_power["clean_power_by_signal"]:
  222. rp_signal = lp.clean_limited_power_by_signal(plt_name)
  223. if self.opt.usable_power["clean_power_by_wind"]:
  224. rp_wind = lp.clean_limited_power(plt_name, cluster=is_repair)
  225. if self.opt.usable_power["clean_power_which"] == 0:
  226. rp = rp_signal
  227. elif self.opt.usable_power["clean_power_which"] == 1:
  228. rp = rp_wind
  229. elif self.opt.usable_power["clean_power_which"] == 2:
  230. time_intersection = set(rp_signal['C_TIME'])
  231. time_intersection.intersection_update(rp_wind['C_TIME'])
  232. rp = rp_wind[rp_wind['C_TIME'].isin(time_intersection)]
  233. rp = rp[['C_TIME', 'C_REAL_VALUE']]
  234. rp.loc[:, 'C_REAL_VALUE'] = rp.loc[:, 'C_REAL_VALUE'].apply(pd.to_numeric)
  235. dq = pd.merge(dq, rp, on='C_TIME')
  236. dq = dq[['C_TIME', 'C_FP_VALUE', 'C_REAL_VALUE']]
  237. dq.set_index('C_TIME', inplace=True)
  238. dq = dq.loc[his_begin: end].reset_index(inplace=False)
  239. nwp.set_index('C_TIME', inplace=True)
  240. nwp = nwp.loc[begin: end].reset_index()
  241. env.set_index('C_TIME', inplace=True)
  242. env = env.loc[his_begin: end].reset_index()
  243. # envs = []
  244. # for index, row in env.iterrows():
  245. # if row['C_TIME'].minute in (00, 15, 30, 45):
  246. # envs.append(row)
  247. # if len(envs) > 0:
  248. # env = pd.concat(envs, axis=1).T
  249. # env.reset_index(drop=True, inplace=True)
  250. # else:
  251. # raise ArithmeticError("测风塔数据为空")
  252. # dqs = self.data.splite_data(dq)
  253. # nwps = self.data.splite_data(nwp)
  254. if is_repair:
  255. mean, std = {}, {}
  256. for df in [nwp, env, dq, rp]:
  257. m, s, _ = self.normalize(df)
  258. mean.update(m)
  259. std.update(s)
  260. self.opt.mean = {k: float(v) for k, v in mean.items()}
  261. self.opt.std = {k: float(v) for k, v in std.items()}
  262. return nwp, env, dq, rp
  263. def saveVar(self, path, data):
  264. os.makedirs(os.path.dirname(path), exist_ok=True)
  265. with open(path, 'wb') as file:
  266. pickle.dump(data, file)
  267. def normalize(self, df, drop_list=['C_TIME'], mean=None, std=None):
  268. df1 = df.copy()
  269. drop_index = [list(df1).index(x) for x in drop_list]
  270. df1.drop(drop_list, axis=1, inplace=True, errors='ignore')
  271. df1 = df1.apply(pd.to_numeric, errors='ignore')
  272. if mean is None or std is None:
  273. mean = np.mean(df1, axis=0).round(3) # 数据的均值
  274. std = np.std(df1, axis=0).round(3) # 标准差
  275. new = []
  276. for i, row in df1.iterrows():
  277. d = (row - mean) / std # 归一化
  278. new.append(d.round(3))
  279. if len(new) > 0:
  280. new = pd.concat(new, axis=1).T
  281. for index, col_name in zip(drop_index, drop_list):
  282. new.insert(index, col_name, df[col_name].values)
  283. return mean, std, new
  284. if __name__ == '__main__':
  285. from config import myargparse
  286. from error import dqFix
  287. from process import preData
  288. from detect import Detection
  289. from logs import Log
  290. args = myargparse(discription="场站端配置", add_help=False)
  291. data = preData(args)
  292. detect = Detection(args)
  293. log = Log()
  294. fix = dqFix(args=args)
  295. clock = Clock(log, args, data, detect, fix)
  296. # clock.calculate_coe(cluster=True)
  297. clock.clustering()