forecast_ust.py 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/27 16:29
  4. # file: app.py.py
  5. # author: David
  6. # company: shenyang JY
  7. import os
  8. import numpy as np
  9. np.random.seed(42)
  10. import pandas as pd
  11. from flask import Flask, request, g
  12. from startup import start_up
  13. from cache.clocking import Clock
  14. import threading
  15. import json, time
  16. from datetime import datetime
  17. app = Flask(__name__)
  18. with app.app_context():
  19. import tensorflow as tf
  20. global graph, sess
  21. tf.compat.v1.set_random_seed(1234)
  22. graph = tf.compat.v1.get_default_graph()
  23. session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
  24. sess = tf.compat.v1.Session(graph=graph, config=session_conf)
  25. logger, va, args, req, process, features, fmi, fix = start_up(graph, sess) # 程序初始化
  26. # 实例化定时任务类
  27. clock = Clock(logger=logger, args=args, va=va, process=process, features=features, fmi=fmi, fix=fix)
  28. logger.info("定时任务类初始化")
  29. # clock.calculate_coe(cluster=True) # 实际场站中要先修模
  30. clock.update_thread() # 定时任务开启
  31. result = {
  32. "errorCode": 1,
  33. "msg": "无异常",
  34. "res": []
  35. }
  36. @app.before_request
  37. def update_config():
  38. # print("-----------------beofore_request------------------")
  39. opt = args.parse_args_and_yaml()
  40. g.opt = opt
  41. va.opt = opt
  42. process.opt = opt
  43. features.opt = opt
  44. fix.opt = opt
  45. # va.status = 0
  46. # class StandaloneApplication(gunicorn.app.base.BaseApplication):
  47. # def __init__(self, app, options=None):
  48. # self.options = options or {}
  49. # self.application = app
  50. # super().__init__()
  51. #
  52. # def load_config(self):
  53. # config = {key: value for key, value in self.options.items()
  54. # if key in self.cfg.settings and value is not None}
  55. # for key, value in config.items():
  56. # self.cfg.set(key.lower(), value)
  57. #
  58. # def load(self):
  59. # return self.application
  60. integral = 0
  61. @app.route('/cdq', methods=['post'])
  62. def cdq():
  63. try:
  64. opt = g.opt
  65. # 初始化请求处理类
  66. start = time.time()
  67. history_dq, history_rp, env, nwp, dq = req.get_form_data(request)
  68. his = va.validate_his_data(history_rp, env, history_dq).reset_index(drop=True)
  69. print("----进入预处理算法----")
  70. history_rp = va.validate_power(his)
  71. history_rp.rename(columns={'NEW_RP': 'C_REAL_VALUE'}, inplace=True)
  72. his.drop(columns=['C_REAL_VALUE'], axis=1, inplace=True)
  73. his = pd.merge(his, history_rp, on='C_TIME')
  74. s1 = time.time()
  75. logger.info(f"测光-信号限电处理-执行时间:{(s1 - start) * 1000:.2f}毫秒")
  76. nwp = pd.merge(nwp, dq, on='C_TIME')
  77. nwp = va.validate_nwp(nwp)
  78. nwp = process.get_predict_data(nwp, dq)
  79. va.status = 0
  80. va.validate_authentic(dq, history_dq)
  81. start1 = time.time()
  82. logger.info(f"数据验证-执行时间:{(start1 - s1) * 1000:.2f}毫秒")
  83. mean = [opt.mean.get(x) for x in opt.nwp_columns if x not in ['C_TIME']]
  84. std = [opt.std.get(x) for x in opt.nwp_columns if x not in ['C_TIME']]
  85. nwp = nwp[opt.nwp_columns]
  86. _, _, nwp_features = clock.normalize(nwp, mean=mean, std=std)
  87. if len(nwp_features) > opt.Model["output_size"]:
  88. nwp_features = nwp_features.head(opt.Model["output_size"])
  89. dq = dq.head(opt.Model["output_size"])
  90. mean = [opt.mean.get(x) for x in opt.env_columns if x not in ['C_TIME']]
  91. std = [opt.std.get(x) for x in opt.env_columns if x not in ['C_TIME']]
  92. his = his[opt.env_columns]
  93. _, _, env_features = clock.normalize(his, mean=mean, std=std)
  94. start2 = time.time()
  95. logger.info(f"归一化-执行时间:{(start2 - start1) * 1000:.2f}毫秒")
  96. test_X = features.get_realtime_data([nwp_features], env_features)
  97. start3 = time.time()
  98. logger.info(f"预处理及特征处理-执行时间:{(start3 - start2) * 1000:.2f}毫秒")
  99. logger.info("-----进入超短期预测算法-----")
  100. res = fmi.predict(test_X)[0][1]
  101. res = np.array([r * opt.std['C_REAL_VALUE'] + opt.mean['C_REAL_VALUE'] for r in res])
  102. res[res < 0] = 0 # 如果出现负数,置为0
  103. res[res > opt.cap] = opt.cap # 出现大于实际装机量的数,置为实际装机量
  104. res = np.around(res, decimals=2)
  105. start4 = time.time()
  106. logger.info(f"算法推理-执行时间:{(start4 - start3) * 1000:.2f}毫秒")
  107. dq_res = fix.history_error(history_dq, history_rp, dq)
  108. dq_res['dq_fix'] = res
  109. res = fix.cdq(dq_res)
  110. end = time.time()
  111. logger.info(f"生成超短期-执行时间:{(end - start4) * 1000:.2f}毫秒")
  112. logger.info(f"总时间:{(end - start) * 1000:.2f}毫秒")
  113. logger.info("----{}".format(res))
  114. va.status = 1
  115. result["errorCode"] = 1
  116. result["res"] = res
  117. result["msg"] = "无异常"
  118. return json.dumps(result, ensure_ascii=False)
  119. except Exception as e:
  120. global integral
  121. logger.error(e.args)
  122. if va.status == 2 and integral > 60:
  123. va.status = 3
  124. integral = 0
  125. result["errorCode"] = va.status if va.status != 1 else 0
  126. result["res"] = None
  127. result["msg"] = e.args[0]
  128. return json.dumps(result, ensure_ascii=False)
  129. @app.route('/forecastVersion', methods=['get'])
  130. def forecast_version():
  131. return g.opt.version
  132. def date_diff(current_dt, repair_dt):
  133. format_pattern = '%Y-%m-%d'
  134. difference = (datetime.strptime(current_dt, format_pattern) - datetime.strptime(repair_dt, format_pattern))
  135. return difference.days
  136. @app.route('/last_model_update', methods=['get'])
  137. def last_model_update():
  138. dt = time.strftime('%Y-%m-%d', time.localtime(time.time()))
  139. repair, repair_dt = int(g.opt.repair_model_cycle), g.opt.authentication['repair']
  140. if repair_dt == 'null':
  141. return {"model_status": 0, "time": 'null', "msg": "neu算法:未修模"}
  142. elif date_diff(dt, repair_dt) > repair*2:
  143. return {"model_status": 1, "time": repair_dt, "msg": "neu算法:距上次修模{}天".format(date_diff(dt, repair_dt))}
  144. elif va.status != 1:
  145. status_msg = {2: "环境数据缺失", 3: "重载环境数据"}
  146. global integral
  147. if va.status == 2 and integral <= 60:
  148. integral += 1
  149. return {"model_status": 1, "time": repair_dt, "msg": "neu算法:接口状态{}".format(status_msg.get(va.status, '检查'))}
  150. else:
  151. return {"model_status": 2, "time": repair_dt, "msg": "neu算法:修模正常"}
  152. if __name__ == "__main__":
  153. # current_path = os.path.dirname(__file__)
  154. # opt = args.parse_args_and_yaml()
  155. # gunicorn_config = {
  156. # 'bind': '%s:%s' % ('0.0.0.0', str(opt.port)),
  157. # 'certfile': current_path + '/ssl/server.pem',
  158. # 'keyfile': current_path + '/ssl/server.key',
  159. # "check_config": True,
  160. # "worker_class": "gthread",
  161. # "workers": 1,
  162. # "threads": 2,
  163. # 'timeout': 60,
  164. # "loglevel": "info",
  165. # "access_log_format": "gunicorn %(h)s - %(t)s - %(r)s - %(s)s - %(f)s",
  166. # "backlog": 30,
  167. # }
  168. # threading.Thread(target=clock.calculate_coe, args=(True,)).start()
  169. # # # 启动服务
  170. # app.run(host='0.0.0.0', port=opt.port, debug=False, ssl_context=(current_path + '/ssl/server.pem', current_path + '/ssl/server.key'))
  171. clock.calculate_coe(True)