#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/3/27 16:29 # file: app.py.py # author: David # company: shenyang JY import os import numpy as np np.random.seed(42) import pandas as pd from flask import Flask, request from startup import start_up from cache.clocking import Clock import threading import json, time from datetime import datetime app = Flask(__name__) with app.app_context(): # import tensorflow as tf # global graph, sess # tf.compat.v1.set_random_seed(1234) # graph = tf.compat.v1.get_default_graph() # session_conf = tf.compat.v1.ConfigProto(intra_op_parallelism_threads=1, inter_op_parallelism_threads=1) # sess = tf.compat.v1.Session(graph=graph, config=session_conf) logger, va, args, req, process, features, fmi = start_up() # 程序初始化 # model = fmi.fmi_model # 实例化定时任务类 clock = Clock(logger=logger, args=args, process=process, features=features, fmi=fmi) logger.info("定时任务类初始化") # clock.calculate_coe(cluster=True) # 实际场站中要先修模 clock.update_thread() # 定时任务开启 result = { "errorCode": 1, "msg": "无异常", "res": [] } @app.before_request def update_config(): print("-----------------beofore_request------------------") global opt opt = args.parse_args_and_yaml() va.opt = opt process.opt = opt va.status = 0 @app.route('/neu', methods=['post']) def cdq(): try: start = time.time() # 初始化请求处理类 nwp, dq, history_dq, history_rp, env = req.get_form_data(request) print("----进入预处理算法----") history_rp = va.validate_power(history_rp, env) history_rp.drop(['C_REAL_VALUE'], axis=1, inplace=True) history_rp.rename(columns={'NEW_RP': 'C_REAL_VALUE'}, inplace=True) s1 = time.time() logger.info(f"1解析数据验证-执行时间:{(s1 - start) * 1000}毫秒") nwp = pd.merge(nwp, dq, on='C_TIME') his = pd.merge(history_rp, history_dq, on='C_TIME') his = pd.merge(env, his, on='C_TIME') nwp = va.validate_nwp(nwp) his = va.validate_env(his) va.validate_authentic(dq, history_dq) start1 = time.time() logger.info(f"2解析数据验证-执行时间:{(start1 - s1) * 1000}毫秒") mean = [opt.mean.get(x) for x in opt.nwp_columns if x not in ['C_TIME']] std = [opt.std.get(x) for x in opt.nwp_columns if x not in ['C_TIME']] nwp = nwp[opt.nwp_columns] _, _, nwp_features = clock.normalize(nwp, mean=mean, std=std) mean = [opt.mean.get(x) for x in opt.env_columns if x not in ['C_TIME']] std = [opt.std.get(x) for x in opt.env_columns if x not in ['C_TIME']] his = his[opt.env_columns] _, _, env_features = clock.normalize(his, mean=mean, std=std) start2 = time.time() logger.info(f"归一化-执行时间:{(start2 - start1) * 1000}毫秒") data_test, env = process.get_test_data(nwp_features, env_features) test_X = features.get_realtime_data(data_test, env) start3 = time.time() logger.info(f"特征处理-执行时间:{(start3 - start2) * 1000}毫秒") logger.info("-----进入超短期预测算法-----") # with graph.as_default(): # with sess.as_default(): res = fmi.fmi_model.predict(test_X, batch_size=1)[0] start4 = time.time() logger.info(f"算法推理-执行时间:{(start4 - start3) * 1000}毫秒") # res = fmi.fmi_model.predict(opt, test_X)[0] res = np.array([r*opt.std['C_REAL_VALUE'] + opt.mean['C_REAL_VALUE'] for r in res]) res = np.array([r*opt.calculate['coe'] + opt.calculate['abs'] for r in res]) res[res < 0] = 0 res[res > opt.cap] = opt.cap res = np.around(res, decimals=2) times = dq['C_TIME'].dt.strftime('%Y-%m-%d %H:%M:%S').values res = [{"C_TIME": times[i], "CDQ_VALUE": x} for i, x in enumerate(res)] end = time.time() logger.info(f"反归一化-执行时间:{(end - start4) * 1000}毫秒") print(f"总时间:{(end - start) * 1000}毫秒") logger.info("----{}".format(res)) result["errorCode"] = 1 result["res"] = res result["msg"] = "无异常" return json.dumps(result, ensure_ascii=False) except Exception as e: logger.error(e.args) result["errorCode"] = va.status if va.status != 1 else 0 result["res"] = None result["msg"] = e.args return json.dumps(result, ensure_ascii=False) @app.route('/forecastVersion', methods=['get']) def forecast_version(): opt = args.parse_args_and_yaml() return opt.version def date_diff(current_dt, repair_dt): format_pattern = '%Y-%m-%d' difference = (datetime.strptime(current_dt, format_pattern) - datetime.strptime(repair_dt, format_pattern)) return difference.days @app.route('/last_model_update', methods=['get']) def last_model_update(): dt = time.strftime('%Y-%m-%d', time.localtime(time.time())) repair, repair_dt = int(opt.repair_model_cycle), opt.authentication['repair'] if repair_dt == 'null': return {"model_status": 0, "time": 'null', "msg": "未修模"} elif date_diff(dt, repair_dt) > repair*2: return {"model_status": 1, "time": repair_dt, "msg": "距上次修模已过{}天".format(date_diff(dt, repair_dt))} else: return {"model_status": 2, "time": repair_dt, "msg": "修模正常"} if __name__ == "__main__": opt = args.parse_args_and_yaml() current_path = os.path.dirname(__file__) gunicorn_config = { 'bind': '%s:%s' % ('0.0.0.0', str(opt.port)), 'certfile': current_path + '/ssl/server.pem', 'keyfile': current_path + '/ssl/server.key', "check_config": True, "worker_class": "gthread", "workers": 1, "threads": 1, 'timeout': 100, "loglevel": "info", "access_log_format": "gunicorn %(h)s - %(t)s - %(r)s - %(s)s - %(f)s", "backlog": 30, } threading.Thread(target=clock.calculate_coe, args=(True,)).start() # # 启动服务 # app.run(host='0.0.0.0', port=9008, debug=False) init_file = './app.ini' os.system("uwsgi --init {}".format(init_file))