app_gunicorn.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/3/27 16:29
  4. # file: app.py.py
  5. # author: David
  6. # company: shenyang JY
  7. import os
  8. import numpy as np
  9. np.random.seed(42)
  10. import pandas as pd
  11. from flask import Flask, request
  12. from startup import start_up
  13. from cache.clocking import Clock
  14. import gunicorn.app.base
  15. import threading
  16. import json, time
  17. from datetime import datetime
  18. app = Flask(__name__)
  19. with app.app_context():
  20. # import tensorflow as tf
  21. # global graph, sess
  22. # tf.compat.v1.set_random_seed(1234)
  23. # graph = tf.compat.v1.get_default_graph()
  24. # session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1)
  25. # sess = tf.compat.v1.Session(graph=graph, config=session_conf)
  26. logger, va, args, req, process, features, fmi = start_up() # 程序初始化
  27. # model = fmi.fmi_model
  28. # 实例化定时任务类
  29. clock = Clock(logger=logger, args=args, process=process, features=features, fmi=fmi)
  30. logger.info("定时任务类初始化")
  31. # clock.calculate_coe(cluster=True) # 实际场站中要先修模
  32. clock.update_thread() # 定时任务开启
  33. result = {
  34. "errorCode": 1,
  35. "msg": "无异常",
  36. "res": []
  37. }
  38. @app.before_request
  39. def update_config():
  40. print("-----------------beofore_request------------------")
  41. global opt
  42. opt = args.parse_args_and_yaml()
  43. va.opt = opt
  44. process.opt = opt
  45. va.status = 0
  46. class StandaloneApplication(gunicorn.app.base.BaseApplication):
  47. def __init__(self, app, options=None):
  48. self.options = options or {}
  49. self.application = app
  50. super().__init__()
  51. def load_config(self):
  52. config = {key: value for key, value in self.options.items()
  53. if key in self.cfg.settings and value is not None}
  54. for key, value in config.items():
  55. self.cfg.set(key.lower(), value)
  56. def load(self):
  57. return self.application
  58. @app.route('/neu', methods=['post'])
  59. def cdq():
  60. try:
  61. start = time.time()
  62. # 初始化请求处理类
  63. nwp, dq, history_dq, history_rp, env = req.get_form_data(request)
  64. print("----进入预处理算法----")
  65. history_rp = va.validate_power(history_rp, env)
  66. history_rp.drop(['C_REAL_VALUE'], axis=1, inplace=True)
  67. history_rp.rename(columns={'NEW_RP': 'C_REAL_VALUE'}, inplace=True)
  68. s1 = time.time()
  69. logger.info(f"1解析数据验证-执行时间:{(s1 - start) * 1000}毫秒")
  70. nwp = pd.merge(nwp, dq, on='C_TIME')
  71. his = pd.merge(history_rp, history_dq, on='C_TIME')
  72. his = pd.merge(env, his, on='C_TIME')
  73. nwp = va.validate_nwp(nwp)
  74. his = va.validate_env(his)
  75. va.validate_authentic(dq, history_dq)
  76. start1 = time.time()
  77. logger.info(f"2解析数据验证-执行时间:{(start1 - s1) * 1000}毫秒")
  78. mean = [opt.mean.get(x) for x in opt.nwp_columns if x not in ['C_TIME']]
  79. std = [opt.std.get(x) for x in opt.nwp_columns if x not in ['C_TIME']]
  80. nwp = nwp[opt.nwp_columns]
  81. _, _, nwp_features = clock.normalize(nwp, mean=mean, std=std)
  82. mean = [opt.mean.get(x) for x in opt.env_columns if x not in ['C_TIME']]
  83. std = [opt.std.get(x) for x in opt.env_columns if x not in ['C_TIME']]
  84. his = his[opt.env_columns]
  85. _, _, env_features = clock.normalize(his, mean=mean, std=std)
  86. start2 = time.time()
  87. logger.info(f"归一化-执行时间:{(start2 - start1) * 1000}毫秒")
  88. data_test, env = process.get_test_data(nwp_features, env_features)
  89. test_X = features.get_realtime_data(data_test, env)
  90. start3 = time.time()
  91. logger.info(f"特征处理-执行时间:{(start3 - start2) * 1000}毫秒")
  92. logger.info("-----进入超短期预测算法-----")
  93. # with graph.as_default():
  94. # with sess.as_default():
  95. res = fmi.fmi_model.predict(test_X, batch_size=1)[0]
  96. start4 = time.time()
  97. logger.info(f"算法推理-执行时间:{(start4 - start3) * 1000}毫秒")
  98. # res = fmi.fmi_model.predict(opt, test_X)[0]
  99. res = np.array([r*opt.std['C_REAL_VALUE'] + opt.mean['C_REAL_VALUE'] for r in res])
  100. res = np.array([r*opt.calculate['coe'] + opt.calculate['abs'] for r in res])
  101. res[res < 0] = 0
  102. res[res > opt.cap] = opt.cap
  103. res = np.around(res, decimals=2)
  104. times = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S').values
  105. res = [{"C_TIME": times[i], "CDQ_VALUE": x} for i, x in enumerate(res)]
  106. end = time.time()
  107. logger.info(f"反归一化-执行时间:{(end - start4) * 1000}毫秒")
  108. print(f"总时间:{(end - start) * 1000}毫秒")
  109. logger.info("----{}".format(res))
  110. result["errorCode"] = 1
  111. result["res"] = res
  112. result["msg"] = "无异常"
  113. return json.dumps(result, ensure_ascii=False)
  114. except Exception as e:
  115. logger.error(e.args)
  116. result["errorCode"] = va.status if va.status != 1 else 0
  117. result["res"] = None
  118. result["msg"] = e.args
  119. return json.dumps(result, ensure_ascii=False)
  120. @app.route('/forecastVersion', methods=['get'])
  121. def forecast_version():
  122. opt = args.parse_args_and_yaml()
  123. return opt.version
  124. def date_diff(current_dt, repair_dt):
  125. format_pattern = '%Y-%m-%d'
  126. difference = (datetime.strptime(current_dt, format_pattern) - datetime.strptime(repair_dt, format_pattern))
  127. return difference.days
  128. @app.route('/last_model_update', methods=['get'])
  129. def last_model_update():
  130. dt = time.strftime('%Y-%m-%d', time.localtime(time.time()))
  131. repair, repair_dt = int(opt.repair_model_cycle), opt.authentication['repair']
  132. if repair_dt == 'null':
  133. return {"model_status": 0, "time": 'null', "msg": "未修模"}
  134. elif date_diff(dt, repair_dt) > repair*2:
  135. return {"model_status": 1, "time": repair_dt, "msg": "距上次修模已过{}天".format(date_diff(dt, repair_dt))}
  136. else:
  137. return {"model_status": 2, "time": repair_dt, "msg": "修模正常"}
  138. if __name__ == "__main__":
  139. opt = args.parse_args_and_yaml()
  140. current_path = os.path.dirname(__file__)
  141. gunicorn_config = {
  142. 'bind': '%s:%s' % ('0.0.0.0', str(opt.port)),
  143. 'certfile': current_path + '/ssl/server.pem',
  144. 'keyfile': current_path + '/ssl/server.key',
  145. "check_config": True,
  146. "worker_class": "gthread",
  147. "workers": 1,
  148. "threads": 1,
  149. 'timeout': 100,
  150. "loglevel": "info",
  151. "access_log_format": "gunicorn %(h)s - %(t)s - %(r)s - %(s)s - %(f)s",
  152. "backlog": 30,
  153. }
  154. threading.Thread(target=clock.calculate_coe, args=(True,)).start()
  155. # # 启动服务
  156. StandaloneApplication(app, options=gunicorn_config).run()
  157. # app.run(host='0.0.0.0', port=7999, debug=False)