#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :tf_lstm.py # @Time :2025/2/12 14:03 # @Author :David # @Company: shenyang JY from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten from tensorflow.keras.models import Model, load_model from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau from tensorflow.keras import optimizers, regularizers from tensorflow.keras.layers import BatchNormalization, GlobalAveragePooling1D, Dropout, Add, Concatenate, Multiply from models_processing.model_tf.losses import region_loss import numpy as np from common.database_dml_koi import * from models_processing.model_tf.settings import set_deterministic from threading import Lock import argparse model_lock = Lock() set_deterministic(42) class TSHandler(object): def __init__(self, logger, args): self.logger = logger self.opt = argparse.Namespace(**args) self.model = None self.model_params = None def get_model(self, args): """ 单例模式+线程锁,防止在异步加载时引发线程安全 """ try: with model_lock: loss = region_loss(self.opt) self.model, self.model_params = get_keras_model_from_mongo(args, {type(loss).__name__: loss}) except Exception as e: self.logger.info("加载模型权重失败:{}".format(e.args)) @staticmethod def get_keras_model(opt): """优化后的新能源功率预测模型 主要改进点: 1. 多尺度特征提取 2. 注意力机制 3. 残差连接 4. 弹性正则化 5. 自适应学习率调整 """ # 正则化配置 l1_l2_reg = regularizers.l1_l2( l1=opt.Model['lambda_value_1'], l2=opt.Model['lambda_value_2'] ) # 输入层 nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp_input') # %% 多尺度特征提取模块 def multi_scale_block(input_layer): # 并行卷积路径 conv3 = Conv1D(64, 3, padding='causal', activation='relu')(input_layer) conv5 = Conv1D(64, 5, padding='causal', activation='relu')(input_layer) return Concatenate()([conv3, conv5]) # 特征主干 x = multi_scale_block(nwp_input) # %% 残差注意力模块 def residual_attention_block(input_layer, filters): # 主路径 y = Conv1D(filters, 3, padding='same', activation='relu')(input_layer) y = BatchNormalization()(y) # 注意力门控 attention = Dense(filters, activation='sigmoid')(y) y = Multiply()([y, attention]) # 残差连接 shortcut = Conv1D(filters, 1, padding='same')(input_layer) return Add()([y, shortcut]) x = residual_attention_block(x, 128) x = Dropout(0.3)(x) # %% 特征聚合 x = GlobalAveragePooling1D()(x) # 替代Flatten保留时序特征 # %% 深度可调全连接层 x = Dense(256, activation='swish', kernel_regularizer=l1_l2_reg)(x) x = BatchNormalization()(x) x = Dropout(0.5)(x) # %% 输出层(可扩展为概率预测) output = Dense(16, activation='linear', name='main_output')(x) # 概率预测扩展(可选) # variance = Dense(1, activation='softplus')(x) # 输出方差 # output = Concatenate()([output, variance]) # %% 模型编译 model = Model(inputs=nwp_input, outputs=output) # 自适应优化器配置 adam = optimizers.Adam( learning_rate=opt.Model['learning_rate'], beta_1=0.92, # 调整动量参数 beta_2=0.999, epsilon=1e-07, amsgrad=True ) # 编译配置(假设region_loss已定义) model.compile( loss=region_loss(opt), # 自定义损失函数 optimizer=adam, metrics=['mae', 'mse'] # 监控指标 ) return model def train_init(self): try: # 进行加强训练,支持修模 loss = region_loss(self.opt) base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss}) base_train_model.summary() self.logger.info("已加载加强训练基础模型") return base_train_model except Exception as e: self.logger.info("加载模型权重失败:{}".format(e.args)) def training(self, model, train_and_valid_data): model.summary() train_x, train_y, valid_x, valid_y = train_and_valid_data # 回调函数配置 callbacks = [ EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], restore_best_weights=True), ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=8, min_lr=1e-7) ] history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=callbacks, shuffle=False) loss = np.round(history.history['loss'], decimals=5) val_loss = np.round(history.history['val_loss'], decimals=5) self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss))) self.logger.info("训练集损失函数为:{}".format(loss)) self.logger.info("验证集损失函数为:{}".format(val_loss)) return model def predict(self, test_x, batch_size=1): result = self.model.predict(test_x, batch_size=batch_size) self.logger.info("执行预测方法") return result if __name__ == "__main__": run_code = 0