clocking.py 26 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/11/23 14:07
  4. # file: clocking.py
  5. # author: David
  6. # company: shenyang JY
  7. import pandas as pd
  8. import threading
  9. import datetime
  10. from cache.limited_power import LimitPower
  11. from apscheduler.schedulers.background import BackgroundScheduler
  12. import time
  13. import os
  14. import pickle
  15. import numpy as np
  16. from datetime import timedelta
  17. np.random.seed(42)
  18. from cache.calculate import calculate_acc
  19. from cache.formula import Formulas, Assessment
  20. from cache.inputData import DataBase
  21. from cache.monitor import Monitor
  22. current_path = os.path.dirname(__file__)
  23. from pytz import timezone
  24. class Clock(object):
  25. def __init__(self, logger, args, process, features, fmi, fix):
  26. self.logger = logger
  27. self.args = args
  28. self.process = process
  29. self.features = features
  30. self.fmi = fmi
  31. self.fix = fix
  32. self.opt = self.args.parse_args_and_yaml()
  33. self.mo = Monitor(logger, args)
  34. self.target = self.opt.predict
  35. self.logger.info("---以 {} 进行修模---".format(self.target))
  36. self.lp = LimitPower(self.logger, self.args, None)
  37. def update_thread(self):
  38. thread = threading.Thread(target=self.start_jobs)
  39. thread.start()
  40. def start_jobs(self):
  41. scheduler = BackgroundScheduler()
  42. scheduler.configure({'timezone': timezone("Asia/Shanghai")})
  43. scheduler.add_job(func=self.calculate_coe, trigger="cron", hour=23, minute=0)
  44. scheduler.add_job(func=self.mo.update_config, trigger="interval", seconds=60)
  45. scheduler.start()
  46. def date_diff(self, current_dt, repair_dt):
  47. difference = (current_dt - datetime.datetime.strptime(repair_dt, '%Y-%m-%d').date())
  48. return difference.days
  49. def cal_acc(self, df, target, opt):
  50. df = df.copy()
  51. df.rename(columns={'C_REAL_VALUE': 'realValue'}, inplace=True)
  52. df['ableValue'] = df['realValue']
  53. df['forecastAbleValue'] = df[target]
  54. df = df.apply(pd.to_numeric, errors='ignore')
  55. df['C_TIME'] = pd.to_datetime(df['C_TIME'])
  56. acc = calculate_acc(df, opt=opt)
  57. return acc
  58. def calculate_coe(self, install=False):
  59. try:
  60. start = time.time()
  61. self.logger.info("检测系统当前的时间为:{}".format(time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start))))
  62. self.opt = self.args.parse_args_and_yaml()
  63. if self.opt.algorithm_platform['switch']:
  64. dt = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  65. dt = dt + timedelta(days=1)
  66. self.opt.authentication['date'] = dt.strftime('%Y-%m-%d')
  67. else:
  68. self.opt.authentication['date'] = time.strftime('%Y-%m-%d', time.localtime(start))
  69. day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  70. coe_start = day_end - pd.Timedelta(days=self.opt.update_coe_days)
  71. day_start_db = coe_start - pd.Timedelta(days=self.opt.repair_days) if install is True else coe_start - pd.Timedelta(days=self.opt.update_add_train_days)
  72. if self.date_diff(day_start_db.date(), self.opt.authentication['full_cap']) < 0:
  73. day_start_db = datetime.datetime.strptime(self.opt.authentication['full_cap'], '%Y-%m-%d')
  74. self.logger.info("更新初始修模起始时间为全容量并网:{}".format(self.opt.authentication['full_cap']))
  75. db = DataBase(begin=day_start_db, end=day_end, opt=self.opt, logger=self.logger)
  76. db.data_process()
  77. self.opt.cap = db.opt.cap
  78. self.args.save_args_yml(self.opt)
  79. self.lp.opt = self.opt
  80. formula = Formulas(self.opt)
  81. assess = Assessment(self.opt, self.logger)
  82. day = int(time.strftime('%d', time.localtime(start)))
  83. repair, repairs = int(self.opt.repair_model_cycle), []
  84. terval = repair
  85. while repair <= 30:
  86. repairs.append(repair)
  87. repair += terval
  88. if day in repairs or install is True:
  89. # ------------------- 修模 ------------------------
  90. self.repairing_model(day_start_db, coe_start)
  91. self.opt.authentication['repair'] = self.opt.authentication['date']
  92. self.args.save_args_yml(self.opt)
  93. self.logger.info("------------进入测试集自动计算-------------")
  94. coe_start += pd.Timedelta(days=1)
  95. nwp, env, dq, rp, rp_his = self.material(coe_start, day_end)
  96. rp_his.set_index("C_TIME", inplace=True)
  97. last_day_rps = rp_his.loc[self.opt.authentication['date'], self.target].values
  98. last_day_dt = rp_his.loc[self.opt.authentication['date'], self.target].index
  99. rp_his.reset_index(drop=False, inplace=True)
  100. sun_up = last_day_rps > 0
  101. if np.any(sun_up):
  102. sun_up_i = np.argmax(sun_up)
  103. self.opt.first_point['sun_up_time'] = last_day_dt[sun_up_i].strftime('%Y-%m-%d %H:%M:%S')
  104. self.opt.first_point['sun_up_value'] = float(last_day_rps[sun_up_i])
  105. nwp = pd.merge(nwp, dq, on='C_TIME')
  106. if self.opt.full_field is False:
  107. nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']]
  108. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']]
  109. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']]
  110. _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE'], mean=mean, std=std)
  111. env = pd.merge(env, rp_his, on='C_TIME')
  112. env = env.loc[:, self.opt.env_columns]
  113. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  114. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  115. _, _, env_features = self.normalize(env, mean=mean, std=std)
  116. data_test, env = self.process.get_test_data(nwp_features, env_features)
  117. test_X, test_Y, data_Y = self.features.get_test_data(data_test, env)
  118. result = self.fmi.predict(test_X, batch_size=8)
  119. # 2.历史数据
  120. for point in range(0, 16, 1):
  121. dfs_point = []
  122. for i, df in enumerate(data_Y):
  123. df["dq_fix"] = result[i]
  124. dfs_point.append(df.iloc[point])
  125. pre_data = pd.concat(dfs_point, axis=1).T
  126. pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].apply(pd.to_numeric, errors='ignore')
  127. pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME')
  128. pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  129. pre_data['dq_fix'] = pre_data['dq_fix'].round(2)
  130. pre_data.loc[pre_data['C_FP_VALUE'].values == 0, 'dq_fix'] = 0
  131. pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap
  132. pre_data.loc[pre_data['dq_fix'] < 0, 'dq_fix'] = 0
  133. T = 'T' + str(point + 1)
  134. if self.opt.coe[T]['update'] is False:
  135. continue
  136. pre_data['history'] = self.fix.history_error_clock(pre_data, rp_his, 16 - point - 1)
  137. self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1]))
  138. self.update_coe(pre_data, assess, formula, point, test=False)
  139. self.args.save_args_yml(self.opt)
  140. if self.opt.algorithm_platform['switch']:
  141. dt = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  142. dt = dt + timedelta(days=1)
  143. self.opt.authentication['date'] = dt.strftime('%Y-%m-%d')
  144. self.algorithm_platform()
  145. end = time.time()
  146. self.logger.info("定时任务:周期频率修模,用了 %.2f 秒 " % (end - start))
  147. except Exception as e:
  148. self.logger.critical("定时任务出错:{}".format(e.args))
  149. def repairing_model(self, day_start, day_end):
  150. self.logger.info("-----进入FMI神经网络修模-----")
  151. nwp, env, dq, rp, rp_his = self.material(day_start, day_end, is_repair=True)
  152. nwp = pd.merge(nwp, dq, on='C_TIME')
  153. if self.opt.full_field is False:
  154. nwp = nwp[self.opt.nwp_columns+['C_REAL_VALUE']]
  155. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  156. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME']]
  157. _, _, nwp_features = self.normalize(nwp, mean=mean, std=std)
  158. env = pd.merge(env, rp_his, on='C_TIME')
  159. env = env.loc[:, self.opt.env_columns]
  160. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  161. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  162. _, _, env_features = self.normalize(env, mean=mean, std=std)
  163. self.opt.nwp_columns = nwp_features.columns.to_list()
  164. self.opt.env_columns = env_features.columns.to_list()
  165. if 'C_REAL_VALUE' in self.opt.nwp_columns:
  166. self.opt.nwp_columns.pop(self.opt.nwp_columns.index('C_REAL_VALUE'))
  167. self.opt.Model["input_size_nwp"] = len(self.opt.nwp_columns) - 1
  168. self.opt.Model["input_size_env"] = len(self.opt.env_columns) - 1
  169. self.update_property()
  170. data_train, env = self.process.get_train_data(nwp_features, env_features)
  171. train_X, valid_X, train_Y, valid_Y = self.features.get_train_data(data_train, env)
  172. self.fmi.training(self.opt, [train_X, train_Y, valid_X, valid_Y])
  173. def update_coe(self, pre_data, assess, formula, point, test=False):
  174. cdq = pd.read_csv(current_path + '/data/cdq.csv', parse_dates=['C_TIME'])
  175. cdq['C_TIME'] = pd.to_datetime(cdq['C_TIME'])
  176. cdq = cdq[cdq['C_FORECAST_HOW_LONG_AGO'] == int(point + 1)]
  177. pre_data = pd.merge(cdq, pre_data, on='C_TIME')
  178. T = 'T' + str(point + 1)
  179. if test is False:
  180. best_acc, best, best_coe_m, best_coe_n = 0, 0, 0, 0
  181. region = self.opt.Model['region']
  182. dq_acc, dq_score = assess.electricity_solar_cdq(pre_data, region, 'C_FP_VALUE')
  183. dq_fix_acc, dq_fix_score = assess.electricity_solar_cdq(pre_data, region, 'dq_fix')
  184. his_fix_acc, his_fix_score = assess.electricity_solar_cdq(pre_data, region, 'history')
  185. cdq_acc, cdq_score = assess.electricity_solar_cdq(pre_data, region, 'C_ABLE_VALUE')
  186. for i in range(5, 210):
  187. for j in range(5, 210):
  188. pre_data["new"] = round(i / 170 * pre_data['dq_fix'] + j / 170 * pre_data['history'], 3)
  189. acc, acc_score = assess.electricity_solar_cdq(pre_data, region, 'new', output=False)
  190. if acc > best_acc:
  191. best_acc = acc
  192. best = acc_score
  193. best_coe_m = i / 170
  194. best_coe_n = j / 170
  195. self.logger.info(
  196. "1.过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期的准确率:{:.4f}, 超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  197. str(self.opt.update_coe_days), dq_acc, T, best_acc, cdq_acc, dq_fix_acc, his_fix_acc))
  198. self.logger.info(
  199. "2.过去{}天的短期的考核分:{:.4f},自动确认系数后,{} 超短期的考核分:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  200. str(self.opt.update_coe_days), dq_score, T, best, cdq_score, dq_fix_score, his_fix_score))
  201. best_score, best, best_score_m, best_score_n = 999, 0, 0, 0
  202. for i in range(210, 5, -1):
  203. for j in range(210, 5, -1):
  204. pre_data["new"] = round(i / 170 * pre_data['dq_fix'] + j / 170 * pre_data['history'], 3)
  205. acc, acc_score = assess.electricity_solar_cdq(pre_data, region, 'new', output=False)
  206. if acc_score < best_score:
  207. best_score = acc_score
  208. best = acc
  209. best_score_m = i / 170
  210. best_score_n = j / 170
  211. self.logger.info(
  212. "3.过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期的准确率:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  213. str(self.opt.update_coe_days), dq_acc, T, best, cdq_acc, dq_fix_acc, his_fix_acc))
  214. self.logger.info(
  215. "4.过去{}天的短期的考核分:{:.4f},自动确认系数后,{} 超短期的考核分:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  216. str(self.opt.update_coe_days), dq_score, T, best_score, cdq_score, dq_fix_score, his_fix_score))
  217. if self.opt.coe[T]['score']:
  218. self.opt.coe[T]['m'] = round(best_score_m, 3)
  219. self.opt.coe[T]['n'] = round(best_score_n, 3)
  220. pre_data["new"] = round(best_score_m * pre_data['dq_fix'] + best_score_n * pre_data['history'], 3)
  221. else:
  222. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  223. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  224. pre_data["new"] = round(best_coe_m * pre_data['dq_fix'] + best_coe_n * pre_data['history'], 3)
  225. pre_data.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  226. else:
  227. best, best_coe_m, best_coe_n = 0, 0, 0
  228. pre_data['时间'] = pre_data['C_TIME'].dt.strftime("%Y-%m-%d")
  229. cc = {
  230. 'formulaType': 'DAY_SHORT_ACCURACY',
  231. 'cap': '100',
  232. 'province': 'E46',
  233. 'electricType': 'E1',
  234. 'stationCode': 'J00629'
  235. }
  236. import argparse
  237. config = argparse.Namespace(**cc)
  238. cdqs = pre_data.groupby('时间')
  239. dq_accs, dq_fix_accs, cdq_accs, his_accs = [], [], [], []
  240. for dt, group in cdqs:
  241. dq_acc = self.cal_acc(group, 'C_FP_VALUE', config)
  242. dq_fix_acc = self.cal_acc(group, 'dq_fix', config)
  243. cdq_acc = self.cal_acc(group, 'C_ABLE_VALUE', config)
  244. his_acc = self.cal_acc(group, 'history', config)
  245. dq_accs.append(dq_acc)
  246. dq_fix_accs.append(dq_fix_acc)
  247. cdq_accs.append(cdq_acc)
  248. his_accs.append(his_acc)
  249. print(dt, "这一天, 短期准确率:", dq_acc, "超短期公式准确率:", cdq_acc, "神经网络准确率:", dq_fix_acc,
  250. "历史功率准确率:", his_acc)
  251. for i in range(5, 210):
  252. for j in range(5, 210):
  253. cdq["new"] = round(i / 170 * cdq['dq_fix'] + j / 170 * cdq['history'], 2)
  254. acc = formula.calculate_acc_northeast(cdq['C_REAL_VALUE'].values, cdq['new'].values)
  255. if acc > best:
  256. best = acc
  257. best_coe_m = i / 170
  258. best_coe_n = j / 170
  259. cdq['new'] = round(best_coe_m * cdq['dq_fix'] + best_coe_n * cdq['history'], 2)
  260. cdq.to_csv(current_path + '/data/测试集{}.csv'.format(point + 1), index=False)
  261. self.logger.info(
  262. "过去{}天的短期的准确率:{:.4f},自动确认系数后,{} 超短期:{:.4f},超短期公式:{:.4f},神经网络:{:.4f},历史功率:{:.4f}".format(
  263. str(self.opt.update_coe_days), np.mean(dq_accs), T, best, np.mean(cdq_accs), np.mean(dq_fix_accs),
  264. np.mean(his_accs)))
  265. self.opt.coe[T]['m'] = round(best_coe_m, 3)
  266. self.opt.coe[T]['n'] = round(best_coe_n, 3)
  267. def algorithm_platform(self):
  268. try:
  269. start = time.time()
  270. day_end = datetime.datetime.strptime(self.opt.authentication['date'], '%Y-%m-%d')
  271. day_start = day_end - pd.Timedelta(days=1)
  272. db = DataBase(begin=day_start, end=day_end, opt=self.opt, logger=self.logger)
  273. db.data_process()
  274. formula = Formulas(self.opt)
  275. assess = Assessment(self.opt, self.logger)
  276. self.logger.info("------------进入测试集自动计算-------------")
  277. nwp, env, dq, rp, rp_his = self.material(day_start, day_end)
  278. nwp = pd.merge(nwp, dq, on="C_TIME")
  279. if self.opt.full_field is False:
  280. nwp = nwp[self.opt.nwp_columns + ['C_REAL_VALUE']]
  281. mean = [self.opt.mean.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']]
  282. std = [self.opt.std.get(x) for x in nwp.columns.to_list() if x not in ['C_TIME', 'C_REAL_VALUE']]
  283. _, _, nwp_features = self.normalize(nwp, ['C_TIME', 'C_REAL_VALUE'], mean=mean, std=std)
  284. env = pd.merge(env, rp_his, on='C_TIME')
  285. env = env.loc[:, self.opt.env_columns]
  286. mean = [self.opt.mean.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  287. std = [self.opt.std.get(x) for x in env.columns.to_list() if x not in ['C_TIME']]
  288. _, _, env_features = self.normalize(env, mean=mean, std=std)
  289. data_test, env = self.process.get_test_data(nwp_features, env_features)
  290. test_X, data_Y = self.features.get_test_data(data_test, env)
  291. result = self.fmi.predict(test_X, batch_size=8)
  292. mongo_data = []
  293. # 2.历史数据
  294. for point in range(0, 16, 1):
  295. dfs_point = []
  296. for i, df in enumerate(data_Y):
  297. df["dq_fix"] = result[i]
  298. dfs_point.append(df.iloc[point])
  299. pre_data = pd.concat(dfs_point, axis=1).T
  300. pre_data[["C_REAL_VALUE", "dq_fix"]] = pre_data[["C_REAL_VALUE", "dq_fix"]].apply(pd.to_numeric,
  301. errors='ignore')
  302. pre_data = pd.merge(pre_data, dq[['C_TIME', 'C_FP_VALUE']], on='C_TIME')
  303. pre_data['dq_fix'] = pre_data['dq_fix'] * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  304. pre_data['dq_fix'] = pre_data['dq_fix'].round(2)
  305. pre_data.loc[pre_data['dq_fix'] > self.opt.cap, 'dq_fix'] = self.opt.cap
  306. pre_data.loc[pre_data['dq_fix'] < 0, 'dq_fix'] = 0
  307. T = 'T' + str(point + 1)
  308. if self.opt.coe[T]['update'] is False:
  309. continue
  310. pre_data['history'] = self.fix.history_error_clock(pre_data, rp_his, 16 - point - 1)
  311. self.logger.info("第{}点的时间周期为:{}-{}".format(T, pre_data['C_TIME'][0], pre_data['C_TIME'].iloc[-1]))
  312. pre_data = self.update_coe(pre_data, assess, formula, point, test=False)
  313. pre_data['howLongAgo'] = point + 1
  314. pre_data = pre_data.loc[:,['C_TIME', 'dq_fix', 'C_FP_VALUE', 'history', 'coe-acc', 'coe-ass', 'howLongAgo']]
  315. df_melted = pre_data.melt(id_vars=['C_TIME', 'howLongAgo'], var_name='model', value_name='power_forecast')
  316. df_melted['farm_id'] = self.opt.algorithm_platform['farm_id']
  317. mongo_data.append(df_melted)
  318. from cache.mongo import insert_data_into_mongo
  319. mongo_data = pd.concat(mongo_data, axis=0)
  320. insert_data_into_mongo(mongo_data, self.opt.algorithm_platform)
  321. end = time.time()
  322. self.logger.info("算法平台测试,用了 %s 秒 " % (end - start))
  323. except Exception as e:
  324. self.logger.critical("算法平台定时任务出错:{}".format(e.args))
  325. def update_property(self):
  326. self.process.opt = self.opt
  327. self.features.opt = self.opt
  328. self.fix.opt = self.opt
  329. def material(self, begin, end, is_repair=False):
  330. his_begin = (begin - pd.Timedelta(hours=self.opt.Model["his_points"]/4)).strftime('%Y-%m-%d %H:%M:%S')
  331. begin, end = begin.strftime('%Y-%m-%d'), end.strftime('%Y-%m-%d')
  332. if is_repair is True:
  333. self.logger.info("进入修模的起止时间为:{}-{}".format(begin, end))
  334. else:
  335. self.logger.info("进入测试集的起止时间为:{}-{}".format(begin, end))
  336. nwp = pd.read_csv(current_path + '/data/NWP.csv', parse_dates=['C_TIME'])
  337. rp = pd.read_csv(current_path + '/data/power.csv', parse_dates=['C_TIME'])
  338. dq = pd.read_csv(current_path + '/data/dq.csv', parse_dates=['C_TIME'])
  339. env = pd.read_csv(current_path + '/data/weather-{}-process.csv'.format(self.opt.weatherloc[0]), parse_dates=['C_TIME'])
  340. rp['C_TIME'] = pd.to_datetime(rp['C_TIME'])
  341. rp.set_index('C_TIME', inplace=True)
  342. rp = rp.loc[his_begin: end].reset_index(inplace=False)
  343. rp_his = rp.copy()
  344. if self.opt.Model['fusion'] is False:
  345. env_fill = pd.DataFrame({'C_TIME': rp['C_TIME']})
  346. for col in self.opt.env_columns:
  347. if col not in ['C_TIME', 'C_REAL_VALUE', 'C_FP_VALUE', self.opt.usable_power['env']]:
  348. env_fill[col] = np.random.rand(len(env_fill))
  349. env = pd.merge(env_fill, env.loc[:, ['C_TIME', self.opt.usable_power['env']]], on='C_TIME', how='left')
  350. env = env.fillna(method='ffill')
  351. env = env.fillna(method='bfill')
  352. if self.target == 'C_ABLE_VALUE':
  353. rp.drop(['C_REAL_VALUE'], axis=1, inplace=True)
  354. rp.rename(columns={'C_ABLE_VALUE': 'C_REAL_VALUE'}, inplace=True)
  355. rp_his = rp.copy()
  356. else:
  357. plt_name = '-训练集-' + begin + '-' + end if is_repair is True else '-测试集-' + begin + '-' + end
  358. weather_power = pd.merge(env, rp, on='C_TIME')
  359. self.lp.weather_power = weather_power
  360. if self.opt.usable_power["clean_power_which"] == 0:
  361. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  362. rp = rp_signal
  363. elif self.opt.usable_power["clean_power_which"] == 1:
  364. rp_solar = self.lp.clean_limited_power(plt_name, is_repair=is_repair)
  365. rp = rp_solar
  366. if is_repair is True:
  367. self.opt.usable_power['k'] = self.lp.opt.usable_power['k']
  368. self.opt.usable_power['bias'] = self.lp.opt.usable_power['bias']
  369. elif self.opt.usable_power["clean_power_which"] == 2:
  370. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  371. rp_solar = self.lp.clean_limited_power(plt_name, is_repair=is_repair)
  372. time_intersection = set(rp_signal['C_TIME'])
  373. time_intersection.intersection_update(rp_solar['C_TIME'])
  374. rp = rp_solar[rp_solar['C_TIME'].isin(time_intersection)]
  375. if is_repair is True:
  376. self.opt.usable_power['k'] = self.lp.opt.usable_power['k']
  377. self.opt.usable_power['bias'] = self.lp.opt.usable_power['bias']
  378. elif self.opt.usable_power["clean_power_which"] == 3:
  379. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  380. rp = rp[rp['diff'] < 0.2*self.opt.cap]
  381. elif self.opt.usable_power["clean_power_which"] == 4:
  382. rp['diff'] = rp['C_REFERENCE_POWER_BY_SAMPLE'] - rp['C_REAL_VALUE']
  383. rp_sample = rp[rp['diff'] < 0.2 * self.opt.cap]
  384. rp_signal = self.lp.clean_limited_power_by_signal(plt_name)
  385. time_intersection = set(rp_sample['C_TIME'])
  386. time_intersection.intersection_update(rp_signal['C_TIME'])
  387. rp = rp_sample[rp_sample['C_TIME'].isin(time_intersection)]
  388. else:
  389. self.logger.info("不进行限电清洗")
  390. rp = rp.loc[:, ['C_TIME', 'C_REAL_VALUE']]
  391. rp['C_REAL_VALUE'] = rp['C_REAL_VALUE'].apply(pd.to_numeric)
  392. rp_his = pd.merge(rp_his.loc[:, ['C_TIME', 'C_ABLE_VALUE']], rp, on='C_TIME', how='left')
  393. rp_his['C_REAL_VALUE'] = rp_his['C_REAL_VALUE'].fillna(rp_his['C_ABLE_VALUE'])
  394. rp_his = pd.merge(rp_his, dq, on='C_TIME')
  395. rp_his = rp_his.loc[:, ['C_TIME', 'C_FP_VALUE', 'C_ABLE_VALUE', 'C_REAL_VALUE']]
  396. dq = rp_his[rp_his['C_TIME'].isin(rp['C_TIME'])].copy()
  397. dq.drop(columns=['C_ABLE_VALUE'], inplace=True)
  398. nwp.set_index('C_TIME', inplace=True)
  399. nwp = nwp.loc[begin: end].reset_index()
  400. env.set_index('C_TIME', inplace=True)
  401. env = env.loc[his_begin: end].reset_index()
  402. if is_repair:
  403. mean, std = {}, {}
  404. for df in [nwp, env, dq, rp]:
  405. m, s, _ = self.normalize(df)
  406. mean.update(m)
  407. std.update(s)
  408. self.opt.mean = {k: float(v) for k, v in mean.items()}
  409. self.opt.std = {k: float(v) for k, v in std.items()}
  410. return nwp, env, dq, rp, rp_his
  411. def saveVar(self, path, data):
  412. os.makedirs(os.path.dirname(path), exist_ok=True)
  413. with open(path, 'wb') as file:
  414. pickle.dump(data, file)
  415. def normalize(self, df, drop_list=['C_TIME'], mean=None, std=None):
  416. df1 = df.copy()
  417. drop_index = [list(df1).index(x) for x in drop_list]
  418. df1.drop(drop_list, axis=1, inplace=True, errors='ignore')
  419. df1 = df1.apply(pd.to_numeric, errors='ignore')
  420. if mean is None or std is None:
  421. mean = np.mean(df1, axis=0).round(3) # 数据的均值
  422. std = np.std(df1, axis=0).round(3) # 标准差
  423. new = []
  424. for i, row in df1.iterrows():
  425. d = (row - mean) / std # 归一化
  426. new.append(d.round(3))
  427. if len(new) > 0:
  428. new = pd.concat(new, axis=1).T
  429. for index, col_name in zip(drop_index, drop_list):
  430. new.insert(index, col_name, df[col_name].values)
  431. return mean, std, new
  432. if __name__ == '__main__':
  433. pass