123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :tf_lstm.py
- # @Time :2025/2/12 14:03
- # @Author :David
- # @Company: shenyang JY
- from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten, Add, Multiply, Concatenate
- from tensorflow.keras.models import Model, load_model
- from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
- from tensorflow.keras import optimizers, regularizers
- from models_processing.model_tf.losses import region_loss
- import numpy as np
- from common.database_dml_koi import *
- from models_processing.model_tf.settings import set_deterministic
- from threading import Lock
- import argparse
- model_lock = Lock()
- set_deterministic(42)
- class TSHandler(object):
- def __init__(self, logger, args):
- self.logger = logger
- self.opt = argparse.Namespace(**args)
- self.model = None
- self.model_params = None
- def get_model(self, args):
- """
- 单例模式+线程锁,防止在异步加载时引发线程安全
- """
- try:
- with model_lock:
- loss = region_loss(self.opt)
- self.model, self.model_params = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
- except Exception as e:
- self.logger.info("加载模型权重失败:{}".format(e.args))
- @staticmethod
- def get_keras_model_20250515(opt, time_series=1, lstm_type=1):
- loss = region_loss(opt)
- l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
- l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
- nwp_input = Input(shape=(opt.Model['time_step']*time_series, opt.Model['input_size']), name='nwp')
- con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
- con1_p = MaxPooling1D(pool_size=1, strides=1, padding='valid', data_format='channels_last')(con1)
- nwp_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=True, kernel_regularizer=l2_reg)(con1_p)
- zone = Dense(len(opt.zone), name='zone')(nwp_lstm)
- zonef = Flatten()(zone)
- if lstm_type == 2:
- output = Dense(opt.Model['time_step'], name='cdq_output')(zonef)
- else:
- output = Dense(opt.Model['time_step']*time_series, name='cdq_output')(zonef)
- model = Model(nwp_input, [zone, output])
- adam = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
- model.compile(loss={"zone": loss, "cdq_output": loss}, loss_weights={"zone": 0.5, "cdq_output": 0.5}, optimizer=adam)
- return model
- @staticmethod
- def get_keras_model(opt, time_series=1, lstm_type=1):
- loss = region_loss(opt)
- l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
- l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
- nwp_input = Input(shape=(opt.Model['time_step'] * time_series, opt.Model['input_size']), name='nwp')
- con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu',
- kernel_regularizer=l2_reg)(nwp_input)
- con1_p = MaxPooling1D(pool_size=1, strides=1, padding='valid', data_format='channels_last')(con1)
- nwp_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=True, kernel_regularizer=l2_reg)(con1_p)
- # 分区A/B独立特征提取
- zone_a = LSTM(units=32, return_sequences=True, kernel_regularizer=l2_reg)(nwp_lstm) # 独立LSTM分支
- zone_b = LSTM(units=32, return_sequences=True, kernel_regularizer=l2_reg)(nwp_lstm)
- # 动态权重门控
- gate = Dense(2, activation='softmax')(nwp_lstm) # 自动学习分区重要性
- weighted_zone = Add()([
- Multiply()([gate[:, :, 0:1], zone_a]),
- Multiply()([gate[:, :, 1:2], zone_b])
- ])
- zone_pred = Dense(len(opt.zone), name='zone')(weighted_zone)
- zone_flat = Flatten()(zone_pred)
- # 在zone_flat后添加交叉层
- cross_input = Concatenate()([zone_flat, nwp_lstm[:, -1, :]]) # 融合分区特征和LSTM最后时刻状态
- cross_layer = Dense(64, activation='relu')(cross_input)
- # 最终输出层
- if lstm_type == 2:
- output = Dense(opt.Model['time_step'], name='cdq_output')(cross_layer)
- else:
- output = Dense(opt.Model['time_step'] * time_series, name='cdq_output')(cross_layer)
- model = Model(nwp_input, [zone_pred, output])
- adam = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
- model.compile(loss={"zone": loss, "cdq_output": loss}, loss_weights={"zone": 0.7, "cdq_output": 0.3},
- optimizer=adam)
- return model
- def train_init(self):
- try:
- # 进行加强训练,支持修模
- loss = region_loss(self.opt)
- base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
- base_train_model.summary()
- self.logger.info("已加载加强训练基础模型")
- return base_train_model
- except Exception as e:
- self.logger.info("加载加强训练模型权重失败:{}".format(e.args))
- return False
- def training(self, model, train_and_valid_data):
- model.summary()
- train_x, train_y, valid_x, valid_y = train_and_valid_data
- early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto')
- history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'],
- verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
- loss = np.round(history.history['loss'], decimals=5)
- val_loss = np.round(history.history['val_loss'], decimals=5)
- self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
- self.logger.info("训练集损失函数为:{}".format(loss))
- self.logger.info("验证集损失函数为:{}".format(val_loss))
- return model
- def predict(self, test_x, batch_size=1):
- result = self.model.predict(test_x, batch_size=batch_size)
- self.logger.info("执行预测方法")
- return result
- if __name__ == "__main__":
- run_code = 0
|