#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/3/27 16:29 # file: app.py.py # author: David # company: shenyang JY import os import numpy as np np.random.seed(42) import pandas as pd from flask import Flask, request, g from startup import start_up from cache.clocking import Clock import threading import json, time from datetime import datetime app = Flask(__name__) with app.app_context(): import tensorflow as tf global graph, sess tf.compat.v1.set_random_seed(1234) graph = tf.compat.v1.get_default_graph() session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1) sess = tf.compat.v1.Session(graph=graph, config=session_conf) logger, va, args, req, process, features, fmi, fix = start_up(graph, sess) # 程序初始化 # 实例化定时任务类 clock = Clock(logger=logger, args=args, va=va, process=process, features=features, fmi=fmi, fix=fix) logger.info("定时任务类初始化") # clock.calculate_coe(cluster=True) # 实际场站中要先修模 clock.update_thread() # 定时任务开启 result = { "errorCode": 1, "msg": "无异常", "res": [] } @app.before_request def update_config(): # print("-----------------beofore_request------------------") opt = args.parse_args_and_yaml() g.opt = opt va.opt = opt process.opt = opt features.opt = opt fix.opt = opt # va.status = 0 # class StandaloneApplication(gunicorn.app.base.BaseApplication): # def __init__(self, app, options=None): # self.options = options or {} # self.application = app # super().__init__() # # def load_config(self): # config = {key: value for key, value in self.options.items() # if key in self.cfg.settings and value is not None} # for key, value in config.items(): # self.cfg.set(key.lower(), value) # # def load(self): # return self.application integral = 0 @app.route('/cdq', methods=['post']) def cdq(): try: opt = g.opt # 初始化请求处理类 start = time.time() history_dq, history_rp, env, nwp, dq = req.get_form_data(request) his = va.validate_his_data(history_rp, env, history_dq).reset_index(drop=True) print("----进入预处理算法----") history_rp = va.validate_power(his) history_rp.rename(columns={'NEW_RP': 'C_REAL_VALUE'}, inplace=True) his.drop(columns=['C_REAL_VALUE'], axis=1, inplace=True) his = pd.merge(his, history_rp, on='C_TIME') s1 = time.time() logger.info(f"测光-信号限电处理-执行时间:{(s1 - start) * 1000:.2f}毫秒") nwp = pd.merge(nwp, dq, on='C_TIME') nwp = va.validate_nwp(nwp) nwp = process.get_predict_data(nwp, dq) va.status = 0 va.validate_authentic(dq, history_dq) start1 = time.time() logger.info(f"数据验证-执行时间:{(start1 - s1) * 1000:.2f}毫秒") mean = [opt.mean.get(x) for x in opt.nwp_columns if x not in ['C_TIME']] std = [opt.std.get(x) for x in opt.nwp_columns if x not in ['C_TIME']] nwp = nwp[opt.nwp_columns] _, _, nwp_features = clock.normalize(nwp, mean=mean, std=std) if len(nwp_features) > opt.Model["output_size"]: nwp_features = nwp_features.head(opt.Model["output_size"]) dq = dq.head(opt.Model["output_size"]) mean = [opt.mean.get(x) for x in opt.env_columns if x not in ['C_TIME']] std = [opt.std.get(x) for x in opt.env_columns if x not in ['C_TIME']] his = his[opt.env_columns] _, _, env_features = clock.normalize(his, mean=mean, std=std) start2 = time.time() logger.info(f"归一化-执行时间:{(start2 - start1) * 1000:.2f}毫秒") test_X = features.get_realtime_data([nwp_features], env_features) start3 = time.time() logger.info(f"预处理及特征处理-执行时间:{(start3 - start2) * 1000:.2f}毫秒") logger.info("-----进入超短期预测算法-----") res = fmi.predict(test_X)[0][1] res = np.array([r * opt.std['C_REAL_VALUE'] + opt.mean['C_REAL_VALUE'] for r in res]) res[res < 0] = 0 # 如果出现负数,置为0 res[res > opt.cap] = opt.cap # 出现大于实际装机量的数,置为实际装机量 res = np.around(res, decimals=2) start4 = time.time() logger.info(f"算法推理-执行时间:{(start4 - start3) * 1000:.2f}毫秒") dq_res = fix.history_error(history_dq, history_rp, dq) dq_res['dq_fix'] = res res = fix.cdq(dq_res) end = time.time() logger.info(f"生成超短期-执行时间:{(end - start4) * 1000:.2f}毫秒") logger.info(f"总时间:{(end - start) * 1000:.2f}毫秒") logger.info("----{}".format(res)) va.status = 1 result["errorCode"] = 1 result["res"] = res result["msg"] = "无异常" return json.dumps(result, ensure_ascii=False) except Exception as e: global integral logger.error(e.args) if va.status == 2 and integral > 60: va.status = 3 integral = 0 result["errorCode"] = va.status if va.status != 1 else 0 result["res"] = None result["msg"] = e.args[0] return json.dumps(result, ensure_ascii=False) @app.route('/forecastVersion', methods=['get']) def forecast_version(): return g.opt.version def date_diff(current_dt, repair_dt): format_pattern = '%Y-%m-%d' difference = (datetime.strptime(current_dt, format_pattern) - datetime.strptime(repair_dt, format_pattern)) return difference.days @app.route('/last_model_update', methods=['get']) def last_model_update(): dt = time.strftime('%Y-%m-%d', time.localtime(time.time())) repair, repair_dt = int(g.opt.repair_model_cycle), g.opt.authentication['repair'] if repair_dt == 'null': return {"model_status": 0, "time": 'null', "msg": "neu算法:未修模"} elif date_diff(dt, repair_dt) > repair*2: return {"model_status": 1, "time": repair_dt, "msg": "neu算法:距上次修模{}天".format(date_diff(dt, repair_dt))} elif va.status != 1: status_msg = {2: "环境数据缺失", 3: "重载环境数据"} global integral if va.status == 2 and integral <= 60: integral += 1 return {"model_status": 1, "time": repair_dt, "msg": "neu算法:接口状态{}".format(status_msg.get(va.status, '检查'))} else: return {"model_status": 2, "time": repair_dt, "msg": "neu算法:修模正常"} if __name__ == "__main__": current_path = os.path.dirname(__file__) opt = args.parse_args_and_yaml() gunicorn_config = { 'bind': '%s:%s' % ('0.0.0.0', str(opt.port)), 'certfile': current_path + '/ssl/server.pem', 'keyfile': current_path + '/ssl/server.key', "check_config": True, "worker_class": "gthread", "workers": 1, "threads": 2, 'timeout': 60, "loglevel": "info", "access_log_format": "gunicorn %(h)s - %(t)s - %(r)s - %(s)s - %(f)s", "backlog": 30, } threading.Thread(target=clock.calculate_coe, args=(True,)).start() # # 启动服务 # StandaloneApplication(app, options=gunicorn_config).run() app.run(host='0.0.0.0', port=opt.port, debug=False, ssl_context=(current_path + '/ssl/server.pem', current_path + '/ssl/server.key'))