#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :tf_transformer.py # @Time :2025/5/08 14:03 # @Author :David # @Company: shenyang JY from tensorflow.keras.initializers import glorot_uniform, orthogonal from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten, LayerNormalization, Dropout, Layer, Add, MultiHeadAttention, Dropout from tensorflow.keras.models import Model, load_model from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau from tensorflow.keras import optimizers, regularizers from models_processing.model_tf.losses import region_loss import numpy as np from common.database_dml import * from models_processing.model_tf.settings import set_deterministic from threading import Lock import argparse model_lock = Lock() set_deterministic(42) class PositionalEncoding(tf.keras.layers.Layer): """自定义位置编码层(支持序列化)""" def __init__(self, max_len, d_model, **kwargs): super().__init__(**kwargs) self.max_len = max_len # 将参数保存为实例属性 self.d_model = d_model # 位置编码在初始化时生成 self.position_embedding = self.positional_encoding(max_len, d_model) def get_angles(self, pos, i, d_model): # 计算角度参数 angles = 1 / tf.pow(10000., (2 * (i // 2)) / tf.cast(d_model, tf.float32)) return pos * angles def positional_encoding(self, max_len, d_model): # 生成位置编码矩阵 angle_rads = self.get_angles( pos=tf.range(max_len, dtype=tf.float32)[:, tf.newaxis], i=tf.range(d_model, dtype=tf.float32)[tf.newaxis, :], d_model=d_model ) # 拼接正弦和余弦编码 sines = tf.math.sin(angle_rads[:, 0::2]) cosines = tf.math.cos(angle_rads[:, 1::2]) pos_encoding = tf.concat([sines, cosines], axis=-1) return pos_encoding[tf.newaxis, ...] # 增加批次维度 def call(self, inputs): # 动态截取与输入序列长度匹配的部分 seq_len = tf.shape(inputs)[1] return inputs + self.position_embedding[:, :seq_len, :] def get_config(self): # 将参数序列化(关键步骤!) config = super().get_config() config.update({ 'max_len': self.max_len, 'd_model': self.d_model, }) return config class TransformerHandler(object): def __init__(self, logger, args): self.logger = logger self.opt = argparse.Namespace(**args) self.model = None self.model_params = None def get_model(self, args): """ 单例模式+线程锁,防止在异步加载时引发线程安全 """ try: with model_lock: loss = region_loss(self.opt) self.model, self.model_params = get_keras_model_from_mongo(args, {type(loss).__name__: loss, 'PositionalEncoding': PositionalEncoding}) except Exception as e: self.logger.info("加载模型权重失败:{}".format(e.args)) @staticmethod def get_transformer_model(opt, time_series=1): hidden_size = opt.Model.get('hidden_size', 64) num_heads = opt.Model.get('num_heads', 4) ff_dim = opt.Model.get('ff_dim', 128) l2_reg = regularizers.l2(opt.Model.get('lambda_value_2', 0.01)) nwp_input = Input(shape=(opt.Model['time_step'] * time_series, opt.Model['input_size'])) # 嵌入层 + 位置编码 x = Conv1D(hidden_size, kernel_size=3, padding='same', kernel_regularizer=l2_reg)(nwp_input) x = PositionalEncoding(opt.Model['time_step'], hidden_size)(x) # Transformer编码层(带残差连接) for _ in range(opt.Model.get('num_layers', 2)): # 自注意力 residual = x x = MultiHeadAttention(num_heads=num_heads, key_dim=hidden_size)(x, x) x = Dropout(0.1)(x) x = Add()([residual, x]) x = LayerNormalization()(x) # 前馈网络 residual = x x = Dense(ff_dim, activation='relu')(x) x = Dense(hidden_size)(x) x = Dropout(0.1)(x) x = Add()([residual, x]) x = LayerNormalization()(x) # 输出层(预测每个时间步) output = Dense(1, activation='linear')(x) # output = tf.keras.layers.Lambda(lambda x: tf.squeeze(x, axis=-1))(output) output = Flatten(name='Flatten')(output) model = Model(nwp_input, output) model.compile(loss='mse', optimizer=optimizers.Adam(learning_rate=1e-4)) return model def train_init(self): try: # 进行加强训练,支持修模 loss = region_loss(self.opt) base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss, 'PositionalEncoding': PositionalEncoding}) base_train_model.summary() self.logger.info("已加载加强训练基础模型") return base_train_model except Exception as e: self.logger.info("加载加强训练模型权重失败:{}".format(e.args)) return False def training(self, model, train_and_valid_data): model.summary() train_x, train_y, valid_x, valid_y = train_and_valid_data early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto') history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False) loss = np.round(history.history['loss'], decimals=5) val_loss = np.round(history.history['val_loss'], decimals=5) self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss))) self.logger.info("训练集损失函数为:{}".format(loss)) self.logger.info("验证集损失函数为:{}".format(val_loss)) return model def predict(self, test_x, batch_size=1): result = self.model.predict(test_x, batch_size=batch_size) self.logger.info("执行预测方法") return result if __name__ == "__main__": run_code = 0