#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2024/5/6 13:25 # file: time_series.py # author: David # company: shenyang JY import os.path from flask import Flask from keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, BatchNormalization, Flatten, Dropout, Reshape, Lambda, TimeDistributed from keras.models import Model, load_model from keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard from keras import optimizers, regularizers import keras.backend as K import numpy as np np.random.seed(42) from sloss import SouthLoss, NorthEastLoss import tensorflow as tf tf.compat.v1.set_random_seed(1234) from threading import Lock model_lock = Lock() def rmse(y_true, y_pred): return K.sqrt(K.mean(K.square(y_pred - y_true))) def mae(y_true, y_pred): return K.mean(K.abs(y_pred - y_true), axis=-1) var_dir = os.path.dirname(os.path.dirname(__file__)) class FMI(object): model = None train = False def __init__(self, log, args, graph, sess): self.logger = log self.graph = graph self.sess = sess opt = args.parse_args_and_yaml() with self.graph.as_default(): tf.compat.v1.keras.backend.set_session(self.sess) FMI.get_model(opt) @staticmethod def get_model(opt): """ 单例模式+线程锁,防止在异步加载时引发线程安全 """ try: if FMI.model is None or FMI.train is True: with model_lock: FMI.model = FMI.get_keras_model(opt) FMI.model.load_weights(os.path.join(var_dir, 'var', 'fmi.h5')) except Exception as e: print("加载模型权重失败:{}".format(e.args)) @staticmethod def get_keras_model(opt): db_loss = NorthEastLoss(opt) # south_loss = SouthLoss(opt) l1_reg = regularizers.l1(opt.Model['lambda_value_1']) l2_reg = regularizers.l2(opt.Model['lambda_value_2']) nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp') env_input = Input(shape=(opt.Model['his_points'], opt.Model['input_size_env']), name='env') con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input) d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1) nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1) con2 = Conv1D(filters=64, kernel_size=5, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(env_input) env = MaxPooling1D(pool_size=5, strides=1, padding='valid', data_format='channels_last')(con2) for i in range(opt.Model['lstm_layers']): rs = True if i == opt.Model['lstm_layers']-1: rs = False env = LSTM(units=opt.Model['hidden_size'], return_sequences=rs, name='env_lstm'+str(i), kernel_regularizer=l2_reg)(env) tiao = Dense(16, name='d4', kernel_regularizer=l1_reg)(env) if opt.Model['fusion']: nwpf = Flatten()(nwp) fusion = concatenate([nwpf, tiao]) else: fusion = Flatten()(nwp) output = Dense(opt.Model['output_size'], name='d5')(fusion) model = Model([env_input, nwp_input], output) adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True) model.compile(loss=rmse, optimizer=adam) return model def train_init(self, opt): tf.compat.v1.keras.backend.set_session(self.sess) model = FMI.get_keras_model(opt) try: if opt.Model['add_train'] and opt.authentication['repair'] != "null": # 进行加强训练,支持修模 model.load_weights(os.path.join(var_dir, 'var', 'fmi.h5')) self.logger.info("已加载加强训练基础模型") except Exception as e: self.logger.info("加强训练加载模型权重失败:{}".format(e.args)) model.summary() return model def training(self, opt, train_and_valid_data): model = self.train_init(opt) train_X, train_Y, valid_X, valid_Y = train_and_valid_data print("----------", np.array(train_X[0]).shape) print("++++++++++", np.array(train_X[1]).shape) # weight_lstm_1, bias_lstm_1 = model.get_layer('d1').get_weights() # print("weight_lstm_1 = ", weight_lstm_1) # print("bias_lstm_1 = ", bias_lstm_1) check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss', save_best_only=True, mode='auto') early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto') # tbCallBack = TensorBoard(log_dir='../figure', # histogram_freq=0, # write_graph=True, # write_images=True) history = model.fit(train_X, train_Y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2, validation_data=(valid_X, valid_Y), callbacks=[check_point, early_stop]) loss = np.round(history.history['loss'], decimals=5) val_loss = np.round(history.history['val_loss'], decimals=5) self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss))) self.logger.info("训练集损失函数为:{}".format(loss)) self.logger.info("验证集损失函数为:{}".format(val_loss)) self.logger.info("训练结束,原模型地址:{}".format(id(FMI.model))) with self.graph.as_default(): tf.compat.v1.keras.backend.set_session(self.sess) FMI.train = True FMI.get_model(opt) FMI.train = False self.logger.info("保护线程,加载模型,地址:{}".format(id(FMI.model))) def predict(self, test_X, batch_size=1): with self.graph.as_default(): with self.sess.as_default(): result = FMI.model.predict(test_X, batch_size=batch_size) self.logger.info("执行预测方法") return result