from flask import Flask,request import time import logging import traceback import numpy as np from itertools import chain from common.database_dml import get_data_from_mongo,insert_data_into_mongo,get_h5_model_from_mongo,get_scaler_model_from_mongo from common.processing_data_common import str_to_list from common.alert import send_message from datetime import date, timedelta import pandas as pd app = Flask('model_prediction_lstm——service') # 创建时间序列数据 def create_sequences(data_features,data_target,time_steps): X, y = [], [] if len(data_features)0: y.append(data_target[i + time_steps -1]) return np.array(X), np.array(y) def forecast_data_distribution(pre_data,args): features, time_steps, col_time, model_name = str_to_list(args['features']), int(args['time_steps']), \ args['col_time'], args['model_name'], feature_scaler, target_scaler = get_scaler_model_from_mongo(args) tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d') field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure',} # 根据字段映射重命名列 pre_data = pre_data.rename(columns=field_mapping) diff = set(features) - set(pre_data.columns) if len(pre_data)==0: send_message('lstm预测组件', args['farmId'], '请注意:获取NWP数据为空,预测文件无法生成!') result = pd.DataFrame({col_time:[],'farm_id':[],'power_forecast':[]}) elif len(diff)>0: send_message('lstm预测组件', args['farmId'], f'NWP特征列缺失!features:{diff}') result = pre_data[['date_time', 'farm_id', 'power_forecast']] elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96: send_message('lstm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!") start_time = pre_data[col_time].min() end_time = pre_data[col_time].max() date_range = pd.date_range(start=start_time, end=end_time, freq='15T').strftime('%Y-%m-%d %H:%M:%S').tolist() df_date = pd.DataFrame({col_time:date_range}) result = pd.merge(df_date,pre_data,how='left',on=col_time).sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill') result = result[['date_time', 'farm_id', 'power_forecast']] else: df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill') scaled_features = feature_scaler.transform(df[features]) X_predict, _ = create_sequences(scaled_features, [], time_steps) model = get_h5_model_from_mongo(args) y_predict = list(chain.from_iterable(target_scaler.inverse_transform([model.predict(X_predict).flatten()]))) result = df[-len(y_predict):] result['power_forecast'] = y_predict result.loc[result['power_forecast'] < 0, 'power_forecast'] = 0 return result[['date_time','farm_id','power_forecast']] def model_prediction(df,args): if 'is_limit' in df.columns: df = df[df['is_limit'] == False] features, time_steps, col_time, model_name,howLongAgo,farm_id,target = str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],int(args['howLongAgo']),args['farm_id'],args['target'] feature_scaler,target_scaler = get_scaler_model_from_mongo(args) df = df.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill') scaled_features = feature_scaler.transform(df[features]) X_predict, _ = create_sequences(scaled_features, [], time_steps) # 加载模型时传入自定义损失函数 # model = load_model(f'{farmId}_model.h5', custom_objects={'rmse': rmse}) model = get_h5_model_from_mongo(args) y_predict = list(chain.from_iterable(target_scaler.inverse_transform([model.predict(X_predict).flatten()]))) result = df[-len(y_predict):] result['power_forecast'] = y_predict result.loc[result['power_forecast'] < 0, 'power_forecast'] = 0 result['howLongAgo'] = howLongAgo result['farm_id'] = farm_id result['model'] = model_name return result[['dateTime', 'howLongAgo', 'model', 'farm_id', 'power_forecast', target]] @app.route('/model_prediction_lstm', methods=['POST']) def model_prediction_lstm(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) forecast_file = int(args['forecast_file']) power_df = get_data_from_mongo(args) if forecast_file == 1: predict_data = forecast_data_distribution(power_df,args) else: predict_data = model_prediction(power_df,args) insert_data_into_mongo(predict_data,args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("model_prediction_lstm log") from waitress import serve serve(app, host="0.0.0.0", port=10097) print("server start!")