123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2024/5/6 13:25
- # file: time_series.py
- # author: David
- # company: shenyang JY
- import os.path
- from keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, BatchNormalization, Flatten, Dropout
- from keras.models import Model, load_model
- from keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard
- from keras import optimizers, regularizers
- import keras.backend as K
- import numpy as np
- from sqlalchemy.ext.instrumentation import find_native_user_instrumentation_hook
- np.random.seed(42)
- from cache.sloss import SouthLoss, NorthEastLoss
- import tensorflow as tf
- tf.compat.v1.set_random_seed(1234)
- from threading import Lock
- model_lock = Lock()
- def rmse(y_true, y_pred):
- return K.sqrt(K.mean(K.square(y_pred - y_true)))
- var_dir = os.path.dirname(os.path.dirname(__file__))
- class FMI(object):
- model = None
- train = False
- def __init__(self, log, args, graph, sess):
- self.logger = log
- self.graph = graph
- self.sess = sess
- opt = args.parse_args_and_yaml()
- with self.graph.as_default():
- tf.compat.v1.keras.backend.set_session(self.sess)
- FMI.get_model(opt)
- @staticmethod
- def get_model(opt):
- """
- 单例模式+线程锁,防止在异步加载时引发线程安全
- """
- try:
- if FMI.model is None or FMI.train is True:
- with model_lock:
- FMI.model = FMI.get_keras_model(opt)
- FMI.model.load_weights(os.path.join(var_dir, 'var', 'fmi.h5'))
- except Exception as e:
- print("加载模型权重失败:{}".format(e.args))
- @staticmethod
- def get_keras_model(opt):
- db_loss = NorthEastLoss(opt)
- south_loss = SouthLoss(opt)
- l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
- l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
- nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp')
- env_input = Input(shape=(opt.Model['his_points'], opt.Model['input_size_env']), name='env')
- con1 = Conv1D(filters=64, kernel_size=5, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
- nwp = MaxPooling1D(pool_size=5, strides=1, padding='valid', data_format='channels_last')(con1)
- nwp_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=False, kernel_regularizer=l2_reg)(nwp)
- con2 = Conv1D(filters=64, kernel_size=5, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(env_input)
- env = MaxPooling1D(pool_size=5, strides=1, padding='valid', data_format='channels_last')(con2)
- env_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=False, name='env_lstm',kernel_regularizer=l2_reg)(env)
- tiao = Dense(4, name='d4', kernel_regularizer=l1_reg)(env_lstm)
- if opt.Model['fusion']:
- fusion = concatenate([nwp_lstm, tiao])
- else:
- fusion = nwp_lstm
- output = Dense(opt.Model['output_size'], name='cdq_output')(fusion)
- model = Model([env_input, nwp_input], output)
- adam = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
- model.compile(loss=south_loss, optimizer=adam)
- return model
- def train_init(self, opt):
- tf.compat.v1.keras.backend.set_session(self.sess)
- model = FMI.get_keras_model(opt)
- try:
- if opt.Model['add_train'] and opt.authentication['repair'] != "null":
- # 进行加强训练,支持修模
- model.load_weights(os.path.join(var_dir, 'var', 'fmi.h5'))
- self.logger.info("已加载加强训练基础模型")
- except Exception as e:
- self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
- model.summary()
- return model
- def training(self, opt, train_and_valid_data):
- model = self.train_init(opt)
- train_X, train_Y, valid_X, valid_Y = train_and_valid_data
- print("----------", np.array(train_X[0]).shape)
- print("++++++++++", np.array(train_X[1]).shape)
- # weight_lstm_1, bias_lstm_1 = model.get_layer('d1').get_weights()
- # print("weight_lstm_1 = ", weight_lstm_1)
- # print("bias_lstm_1 = ", bias_lstm_1)
- check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss',
- save_best_only=True, mode='auto')
- early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
- # tbCallBack = TensorBoard(log_dir='../figure',
- # histogram_freq=0,
- # write_graph=True,
- # write_images=True)
- history = model.fit(train_X, train_Y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,
- validation_data=(valid_X, valid_Y), callbacks=[check_point, early_stop])
- loss = np.round(history.history['loss'], decimals=5)
- val_loss = np.round(history.history['val_loss'], decimals=5)
- self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
- self.logger.info("训练集损失函数为:{}".format(loss))
- self.logger.info("验证集损失函数为:{}".format(val_loss))
- self.logger.info("训练结束,原模型地址:{}".format(id(FMI.model)))
- with self.graph.as_default():
- tf.compat.v1.keras.backend.set_session(self.sess)
- FMI.train = True
- FMI.get_model(opt)
- FMI.train = False
- self.logger.info("保护线程,加载模型,地址:{}".format(id(FMI.model)))
- def predict(self, test_X, batch_size=1):
- with self.graph.as_default():
- with self.sess.as_default():
- result = FMI.model.predict(test_X, batch_size=batch_size)
- self.logger.info("执行预测方法")
- return result
- def train_custom(self, train_X, train_Y, model, opt):
- epochs = opt.Model['epoch']
- batch_size = opt.Model['batch_size']
- num_batches = len(train_X) // batch_size # 取整
- optimizer = tf.keras.optimizers.Adam(learning_rate=opt.Model[""])
- for epoch in range(epochs):
- for batch_index in range(epochs):
- start = batch_index * batch_size
- end = start + batch_size
- x_batch, y_batch = train_X[start: end], train_Y[start: end]
- with tf.GradientTape() as tape:
- res = model(x_batch)
- loss = rmse(y_batch, res)
- gradients = tape.gradient(loss, model.trainable_variables)
|