#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :tf_lstm.py # @Time :2025/2/12 14:03 # @Author :David # @Company: shenyang JY import os.path from keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, BatchNormalization, Flatten, Dropout from keras.models import Model, load_model from keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard from keras import optimizers, regularizers import keras.backend as K from common.database_dml import * import numpy as np from sqlalchemy.ext.instrumentation import find_native_user_instrumentation_hook np.random.seed(42) from models_processing.losses.loss_cdq import SouthLoss, NorthEastLoss import tensorflow as tf tf.compat.v1.set_random_seed(1234) from threading import Lock model_lock = Lock() def rmse(y_true, y_pred): return K.sqrt(K.mean(K.square(y_pred - y_true))) var_dir = os.path.dirname(os.path.dirname(__file__)) class TSHandler(object): model = None train = False def __init__(self, logger): self.logger = logger self.model = None def get_model(self, args): """ 单例模式+线程锁,防止在异步加载时引发线程安全 """ try: with model_lock: self.model = get_h5_model_from_mongo(args, {'rmse': rmse}) except Exception as e: self.logger.info("加载模型权重失败:{}".format(e.args)) @staticmethod def get_keras_model(opt): # db_loss = NorthEastLoss(opt) south_loss = SouthLoss(opt) l1_reg = regularizers.l1(opt.Model['lambda_value_1']) l2_reg = regularizers.l2(opt.Model['lambda_value_2']) nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp') con1 = Conv1D(filters=64, kernel_size=5, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input) nwp = MaxPooling1D(pool_size=5, strides=1, padding='valid', data_format='channels_last')(con1) nwp_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=False, kernel_regularizer=l2_reg)(nwp) output = Dense(opt.Model['output_size'], name='cdq_output')(nwp_lstm) model = Model(nwp_input, output) adam = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True) model.compile(loss=south_loss, optimizer=adam) return model def train_init(self, opt): try: if opt.Model['add_train']: # 进行加强训练,支持修模 base_train_model = get_h5_model_from_mongo(vars(opt), {'rmse': rmse}) base_train_model.summary() self.logger.info("已加载加强训练基础模型") else: base_train_model = self.get_keras_model(opt) return base_train_model except Exception as e: self.logger.info("加强训练加载模型权重失败:{}".format(e.args)) def training(self, opt, train_and_valid_data): model = self.train_init(opt) model.summary() train_x, train_y, valid_x, valid_y = train_and_valid_data early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto') history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop]) loss = np.round(history.history['loss'], decimals=5) val_loss = np.round(history.history['val_loss'], decimals=5) self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss))) self.logger.info("训练集损失函数为:{}".format(loss)) self.logger.info("验证集损失函数为:{}".format(val_loss)) return model def predict(self, test_X, batch_size=1): result = TSHandler.model.predict(test_X, batch_size=batch_size) self.logger.info("执行预测方法") return result if __name__ == "__main__": run_code = 0