Преглед на файлове

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei преди 3 месеца
родител
ревизия
a7aac17fe3

+ 2 - 1
data_processing/data_operation/data_handler.py

@@ -190,6 +190,7 @@ class DataHandler(object):
         # 标准化特征和目标
         scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features])
         scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
+        scaled_cap = target_scaler.transform(np.array([[self.opt.cap]]))[0,0]
         train_data_cleaned[features] = scaled_train_data
         train_data_cleaned[[target]] = scaled_target
         # 3.缺值补值
@@ -208,7 +209,7 @@ class DataHandler(object):
             train_x, valid_x, train_y, valid_y =  np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
         else:
             train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
-        return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes
+        return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
 
     def pre_data_handler(self, data, feature_scaler, bp_data=False):
         """

+ 0 - 141
models_processing/losses/loss_cdq.py

@@ -1,141 +0,0 @@
-#!/usr/bin/env python
-# -*- coding: utf-8 -*-
-# time: 2023/5/8 13:15
-# file: loss.py.py
-# author: David
-# company: shenyang JY
-import tensorflow as tf
-tf.compat.v1.set_random_seed(1234)
-
-
-def rmse(y_true, y_pred):
-    return tf.sqrt(tf.reduce_mean(tf.square(y_pred - y_true)))
-
-class SouthLoss(tf.keras.losses.Loss):
-    def __init__(self, opt, name='south_loss'):
-        """
-        南网新规则损失函数
-        :param cap:装机容量
-        """
-        super(SouthLoss, self).__init__(name=name)
-        self.cap = opt.cap*0.2    # 没有归一化cap,必须要先进行归一化
-        self.opt = opt
-        # self.cap01 = opt.cap*0.1
-
-    def call(self, y_true, y_predict):
-        """
-        自动调用
-        :param y_true: 标签
-        :param y_predict: 预测
-        :return: 损失值
-        """
-        # 计算实际和预测的差值
-        # y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        # y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        y_true = y_true[:, 15]
-        y_predict = y_predict[:, 15]
-        diff = y_true - y_predict
-        logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
-        base = logistic_values * y_true + (1-logistic_values)*self.cap
-        loss = K.square(diff/base)
-        # loss = K.mean(loss, axis=-1)
-        return loss
-
-    def call2(self, y_true, y_predict):
-        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        y_true = y_true[:, 15]
-        y_predict = y_predict[:, 15]
-        diff = y_true - y_predict
-        logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
-        base = logistic_values * y_true + (1 - logistic_values) * self.cap
-        loss = K.square(diff / base)
-
-        mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01))
-        count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
-        safe_count = tf.maximum(count, 1)
-        # reduce_sum_loss = tf.reduce_sum(loss, axis=-1)
-        mean_loss = loss / safe_count
-        return mean_loss
-
-    def call1(self, y_true, y_predict):
-        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap)
-        loss = (y_true - y_predict) / base
-        squared_loss = tf.square(loss)
-        mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1])
-        return  mean_squared_loss
-
-
-class NorthEastLoss(tf.keras.losses.Loss):
-    def __init__(self, opt, name='northeast_loss'):
-        """
-        东北新规则超短期损失函数
-        """
-        super(NorthEastLoss, self).__init__(name=name)
-        self.opt = opt
-        self.cap = round(opt.cap*0.1, 2)
-
-    def call(self, y_true, y_predict):
-        # 这里我们添加了一个小的 epsilon 值来避免除以 0
-        # y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        # y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-
-        mask_logical = tf.logical_and(tf.greater(y_true, self.cap), tf.greater(y_predict, self.cap))
-        # mask = tf.cast(~mask_logical, tf.float32)
-        # y_true = y_true * (1 - mask) + 0 * mask
-        # y_predict = y_predict * (1 - mask) + 0 * mask
-
-
-        epsilon = tf.keras.backend.epsilon()
-        y_predict_safe = y_predict + epsilon
-
-        # 计算 (y_true - y_predict) / y_predict_safe
-        difference_over_predict = tf.abs(y_predict - y_true) / tf.abs(y_predict_safe)
-
-        # 将结果中大于等于 1 的部分置为 1,剩下的保留原值
-        masked_difference = tf.where(difference_over_predict >= 1, tf.ones_like(difference_over_predict)*1, difference_over_predict) #tf.where的操作是逐元素的,并且它不会改变张量中元素的数学性质(如可微性、可导性)。
-
-        # 这里我们先沿着特征维度求和,但你也可以选择平均(使用 tf.reduce_mean 而不是 tf.reduce_sum)
-        count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
-        sum_diff = tf.reduce_sum(masked_difference, axis=-1)
-        # mean_loss = tf.reduce_mean(masked_difference, axis=[1])
-        safe_count = tf.maximum(count, 1)
-        mean = sum_diff / safe_count
-        mean1 = tf.reduce_sum(masked_difference, axis=-1)
-        return mean
-
-
-class NorthWestLoss(tf.keras.losses.Loss):
-    def __init__(self, name='northwest_loss'):
-        """
-        东北新规则超短期损失函数
-        """
-        super(NorthWestLoss, self).__init__(name=name)
-
-    def call(self, y_true, y_pred):
-        # 保证预测值和真实值是浮点数
-        y_pred = tf.cast(y_pred, tf.float32)
-        y_true = tf.cast(y_true, tf.float32)
-
-        # 避免除零错误
-        epsilon = 1e-8
-        y_pred_adjusted = y_pred + epsilon
-        y_true_adjusted = y_true + epsilon
-
-        # 计算 |Pr - Pn|
-        abs_diff = tf.abs(y_pred - y_true)
-
-        # 计算 |Pr - Pn| 的总和
-        sum_abs_diff = tf.reduce_sum(abs_diff)
-
-        # 计算每个差值的权重 |Pr - Pn| / sum(|Pr - Pn|)
-        weights = abs_diff / (sum_abs_diff + epsilon)  # 添加 epsilon 避免除零
-
-        # 计算 |Pr/(Pr + Pn) - 0.5|
-        ratios = tf.abs((y_pred_adjusted / (y_pred_adjusted + y_true_adjusted)) - 0.5)
-
-        # 计算最终的损失值
-        loss = 1.0 - 2.0 * tf.reduce_sum(ratios * weights)
-        return loss

+ 1 - 2
models_processing/model_koi/bp.yaml

@@ -15,14 +15,13 @@ Model:
   output_size: 16
   patience: 10
   predict_data_fill: true
-  region: south129
   shuffle_train_data: false
   test_data_fill: false
   time_step: 16
   train_data_fill: false
   use_cuda: false
   valid_data_rate: 0.15
-
+region: south
 calculate: []
 cap: 50.0
 dataloc: ./data

+ 1 - 2
models_processing/model_koi/cnn.yaml

@@ -15,14 +15,13 @@ Model:
   output_size: 16
   patience: 10
   predict_data_fill: true
-  region: south129
   shuffle_train_data: false
   test_data_fill: false
   time_step: 16
   train_data_fill: false
   use_cuda: false
   valid_data_rate: 0.15
-
+region: south
 calculate: []
 cap: 50.0
 dataloc: ./data

+ 156 - 0
models_processing/model_koi/losses.py

@@ -0,0 +1,156 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/5/8 13:15
+# file: loss.py.py
+# author: David
+# company: shenyang JY
+import tensorflow as tf
+from tensorflow.keras.losses import Loss
+from typeguard import typechecked
+tf.compat.v1.set_random_seed(1234)
+
+
+class Rmse(tf.keras.losses.Loss):
+    """
+    自定义损失函数模板
+    功能:实现名称设置、参数保存、张量运算分离
+    """
+
+    def __init__(self,
+                 name,  # 设置损失名称
+                 reduction='mean',
+                 **kwargs):
+        super().__init__(name=name, reduction=reduction)
+
+        # 可添加自定义参数(自动序列化)
+        self.param = kwargs.get('param', 1.0)
+
+    def call(self, y_true, y_pred):
+        """核心计算逻辑(分离张量运算和非张量运算)"""
+        # 非张量运算(预处理)
+        coefficient = tf.constant(self.param, dtype=y_pred.dtype)
+
+        # 张量运算(保持计算图兼容性)
+        squared_difference = tf.square(y_pred - y_true)
+        loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient
+        return loss
+
+
+class SouthLoss(Loss):
+    """
+    南网新规则损失函数(支持完整序列化)
+
+    参数:
+        cap (float): 归一化后的装机容量(需在 [0,1] 范围内)
+        name (str): 损失函数名称,默认为 'south_loss'
+        reduction (str): 损失归约方式,默认为 'sum_over_batch_size'
+
+    示例:
+        >>> loss = SouthLoss(cap=0.5)
+        >>> model.compile(loss=loss, optimizer='adam')
+    """
+
+    @typechecked
+    def __init__(self,
+                 cap: float,
+                 name: str = "south_loss",
+                 reduction: str = "sum_over_batch_size"):
+        # 参数校验
+        if not 0 <= cap <= 1:
+            raise ValueError("cap 必须为归一化后的值且位于 [0,1] 区间")
+
+        super().__init__(name=name, reduction=reduction)
+
+        # 内部处理缩放逻辑(保持原始 cap 的序列化)
+        self._raw_cap = cap  # 保存原始值用于序列化
+        self.cap = tf.constant(cap * 0.2, dtype=tf.float32)  # 实际计算值
+
+    def get_config(self):
+        """获取序列化配置(保存原始 cap 值)"""
+        config = super().get_config()
+        config.update({
+            "cap": self._raw_cap,  # 保存未缩放前的原始值
+            "name": self.name,
+            "reduction": self.reduction
+        })
+        return config
+
+    @classmethod
+    def from_config(cls, config):
+        """反序列化时重建实例"""
+        return cls(
+            cap=config["cap"],
+            name=config["name"],
+            reduction=config["reduction"]
+        )
+
+    def call(self, y_true, y_pred):
+        """计算损失值(带数值稳定化)"""
+        # 确保数据类型一致
+        y_true = tf.cast(y_true, tf.float32)
+        y_pred = tf.cast(y_pred, tf.float32)
+
+        # 数值稳定化处理
+        diff = y_true - y_pred
+        delta = y_true - self.cap
+
+        # 使用稳定化的 sigmoid 计算
+        logistic_input = tf.clip_by_value(10000.0 * delta, -50.0, 50.0)  # 防止梯度爆炸
+        logistic_values = tf.sigmoid(logistic_input)
+
+        # 计算基值
+        base = logistic_values * y_true + (1 - logistic_values) * self.cap
+
+        # 避免除零错误
+        safe_base = tf.where(tf.equal(base, 0.0), 1e-7, base)
+
+        # 计算损失
+        loss = tf.reduce_mean(tf.square(diff / safe_base), axis=-1)
+        return loss
+
+    def call2(self, y_true, y_predict):
+        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_true = y_true[:, 15]
+        y_predict = y_predict[:, 15]
+        diff = y_true - y_predict
+        logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
+        base = logistic_values * y_true + (1 - logistic_values) * self.cap
+        loss = K.square(diff / base)
+
+        mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01))
+        count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
+        safe_count = tf.maximum(count, 1)
+        # reduce_sum_loss = tf.reduce_sum(loss, axis=-1)
+        mean_loss = loss / safe_count
+        return mean_loss
+
+    def call1(self, y_true, y_predict):
+        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap)
+        loss = (y_true - y_predict) / base
+        squared_loss = tf.square(loss)
+        mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1])
+        return  mean_squared_loss
+
+
+
+region_loss_d = {
+    'northeast': lambda region: Rmse(region),
+    'south': lambda cap, region: SouthLoss(cap, region)
+}
+
+
+# 根据地区调用对应逻辑
+def region_loss(opt):
+    handler = region_loss_d.get(opt.region, opt.region)
+    # 判断处理类型并执行
+    if callable(handler):
+        # 如果是lambda或函数,直接调用
+        if opt.region == "south":  # 需要额外参数的地区
+            return handler(opt.cap, opt.region)
+        else:
+            return handler(opt.region)
+    else:
+        raise TypeError("无效的损失函数")

+ 1 - 1
models_processing/model_koi/lstm.yaml

@@ -15,13 +15,13 @@ Model:
   output_size: 16
   patience: 10
   predict_data_fill: true
-  region: south129
   shuffle_train_data: false
   test_data_fill: false
   time_step: 16
   train_data_fill: false
   use_cuda: false
   valid_data_rate: 0.15
+region: south
 calculate: []
 cap: 50.0
 dataloc: ./data

+ 8 - 6
models_processing/model_koi/tf_bp.py

@@ -9,7 +9,7 @@ from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Con
 from tensorflow.keras.models import Model, load_model
 from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
 from tensorflow.keras import optimizers, regularizers
-from models_processing.losses.loss_cdq import rmse
+from models_processing.model_koi.losses import region_loss
 from models_processing.model_koi.settings import set_deterministic
 import numpy as np
 from common.database_dml_koi import *
@@ -30,13 +30,14 @@ class BPHandler(object):
         """
         try:
             with model_lock:
-                # NPHandler.model = NPHandler.get_keras_model(opt)
-                self.model = get_h5_model_from_mongo(args, {'rmse': rmse})
+                loss = region_loss(self.opt)
+                self.model = get_h5_model_from_mongo(args, {type(loss).__name__: loss})
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
     @staticmethod
     def get_keras_model(opt):
+        loss = region_loss(opt)
         model = Sequential([
             Dense(64, input_dim=opt.Model['input_size'], activation='relu'),  # 输入层和隐藏层,10个神经元
             Dense(32, activation='relu'),  # 隐藏层,8个神经元
@@ -44,21 +45,22 @@ class BPHandler(object):
             Dense(1, activation='linear')  # 输出层,1个神经元(用于回归任务)
         ])
         adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
-        model.compile(loss=rmse, optimizer=adam)
+        model.compile(loss=loss, optimizer=adam)
         return model
 
     def train_init(self):
         try:
             if self.opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(self.opt), {'rmse': rmse})
+                loss = region_loss(self.opt)
+                base_train_model = get_h5_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:
                 base_train_model = self.get_keras_model(self.opt)
             return base_train_model
         except Exception as e:
-            self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
+            self.logger.info("加载模型权重失败:{}".format(e.args))
 
     def training(self, train_and_valid_data):
         model = self.train_init()

+ 2 - 1
models_processing/model_koi/tf_bp_pre.py

@@ -54,6 +54,7 @@ def model_prediction_bp():
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
         scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, bp_data=True)
+        bp.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0, 0], 2)
         # ------------ 获取模型,预测结果------------
         bp.get_model(args)
         res = list(chain.from_iterable(target_scaler.inverse_transform([bp.predict(scaled_pre_x).flatten()])))
@@ -91,7 +92,7 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    serve(app, host="0.0.0.0", port=10110, threads=4)
+    serve(app, host="0.0.0.0", port=10110)
     print("server start!")
 
     # ------------------------测试代码------------------------

+ 3 - 2
models_processing/model_koi/tf_bp_train.py

@@ -50,9 +50,10 @@ def model_training_bp():
     try:
         # ------------ 获取数据,预处理训练数据 ------------
         train_data = get_data_from_mongo(args)
-        train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, bp_data=True)
+        train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data, bp_data=True)
         # ------------ 训练模型 ------------
         bp.opt.Model['input_size'] = train_x.shape[1]
+        bp.opt.cap = round(scaled_cap, 2)
         bp_model = bp.training([train_x, train_y, valid_x, valid_y])
         # ------------ 保存模型 ------------
         args['params'] = json.dumps(args)
@@ -80,7 +81,7 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    serve(app, host="0.0.0.0", port=10111, threads=4)
+    serve(app, host="0.0.0.0", port=10111)
     # print("server start!")
     # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
     # 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',

+ 9 - 8
models_processing/model_koi/tf_cnn.py

@@ -10,7 +10,7 @@ from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Con
 from tensorflow.keras.models import Model, load_model
 from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
 from tensorflow.keras import optimizers, regularizers
-from models_processing.losses.loss_cdq import rmse
+from models_processing.model_koi.losses import region_loss
 from models_processing.model_koi.settings import set_deterministic
 import numpy as np
 from common.database_dml_koi import *
@@ -31,15 +31,14 @@ class CNNHandler(object):
         """
         try:
             with model_lock:
-                # NPHandler.model = NPHandler.get_keras_model(opt)
-                self.model = get_h5_model_from_mongo(args, {'rmse': rmse})
+                loss = region_loss(self.opt)
+                self.model = get_h5_model_from_mongo(args, {type(loss).__name__: loss})
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
     @staticmethod
     def get_keras_model(opt):
-        # db_loss = NorthEastLoss(opt)
-        # south_loss = SouthLoss(opt)
+        loss = region_loss(opt)
         l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
         l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
         nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
@@ -53,21 +52,23 @@ class CNNHandler(object):
         model = Model(inputs=nwp_input, outputs=output_f)
         adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
         reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
-        model.compile(loss=rmse, optimizer=adam)
+
+        model.compile(loss=loss, optimizer=adam)
         return model
 
     def train_init(self):
         try:
             if self.opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(self.opt), {'rmse': rmse})
+                loss = region_loss(self.opt)
+                base_train_model = get_h5_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:
                 base_train_model = self.get_keras_model(self.opt)
             return base_train_model
         except Exception as e:
-            self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
+            self.logger.info("加载模型权重失败:{}".format(e.args))
 
     def training(self, train_and_valid_data):
         model = self.train_init()

+ 3 - 1
models_processing/model_koi/tf_cnn_pre.py

@@ -53,6 +53,8 @@ def model_prediction_bp():
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
         scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler)
+        cnn.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0,0], 2)
+        logger.info("---------cap归一化:{}".format(cnn.opt.cap))
         cnn.get_model(args)
         # result = bp.predict(scaled_pre_x, args)
         res = list(chain.from_iterable(target_scaler.inverse_transform([cnn.predict(scaled_pre_x).flatten()])))
@@ -90,7 +92,7 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    serve(app, host="0.0.0.0", port=10112, threads=4)
+    serve(app, host="0.0.0.0", port=10112)
     print("server start!")
 
     # ------------------------测试代码------------------------

+ 5 - 2
models_processing/model_koi/tf_cnn_train.py

@@ -50,9 +50,12 @@ def model_training_bp():
     try:
         # ------------ 获取数据,预处理训练数据 ------------
         train_data = get_data_from_mongo(args)
-        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data)
+        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
         # ------------ 训练模型,保存模型 ------------
         cnn.opt.Model['input_size'] = train_x.shape[2]
+        cnn.opt.cap = round(scaled_cap, 2)
+        logger.info("---------cap归一化:{}".format(cnn.opt.cap))
+
         bp_model = cnn.training([train_x, valid_x, train_y, valid_y])
 
         args['params'] = json.dumps(args)
@@ -81,7 +84,7 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    serve(app, host="0.0.0.0", port=10113, threads=4)
+    serve(app, host="0.0.0.0", port=10113)
     # print("server start!")
     # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
     # 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',

+ 8 - 7
models_processing/model_koi/tf_lstm.py

@@ -9,7 +9,7 @@ from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Con
 from tensorflow.keras.models import Model, load_model
 from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
 from tensorflow.keras import optimizers, regularizers
-from models_processing.losses.loss_cdq import rmse
+from models_processing.model_koi.losses import region_loss
 import numpy as np
 from common.database_dml_koi import *
 from models_processing.model_koi.settings import set_deterministic
@@ -30,14 +30,14 @@ class TSHandler(object):
         """
         try:
             with model_lock:
-                self.model = get_h5_model_from_mongo(args, {'rmse': rmse})
+                loss = region_loss(self.opt)
+                self.model = get_h5_model_from_mongo(args, {type(loss).__name__: loss})
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
     @staticmethod
     def get_keras_model(opt):
-        # db_loss = NorthEastLoss(opt)
-        # south_loss = SouthLoss(opt)
+        loss = region_loss(opt)
         l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
         l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
         nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
@@ -50,21 +50,22 @@ class TSHandler(object):
 
         model = Model(nwp_input, output)
         adam = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
-        model.compile(loss=rmse, optimizer=adam)
+        model.compile(loss=loss, optimizer=adam)
         return model
 
     def train_init(self):
         try:
             if self.opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(self.opt), {'rmse': rmse})
+                loss = region_loss(self.opt)
+                base_train_model = get_h5_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:
                 base_train_model = self.get_keras_model(self.opt)
             return base_train_model
         except Exception as e:
-            self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
+            self.logger.info("加载模型权重失败:{}".format(e.args))
 
     def training(self, train_and_valid_data):
         model = self.train_init()

+ 2 - 1
models_processing/model_koi/tf_lstm_pre.py

@@ -53,6 +53,7 @@ def model_prediction_bp():
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
         scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler)
+        ts.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0, 0], 2)
         ts.get_model(args)
         # result = bp.predict(scaled_pre_x, args)
         res = list(chain.from_iterable(target_scaler.inverse_transform([ts.predict(scaled_pre_x).flatten()])))
@@ -90,7 +91,7 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    serve(app, host="0.0.0.0", port=10114, threads=4)
+    serve(app, host="0.0.0.0", port=10114)
     print("server start!")
 
     # ------------------------测试代码------------------------

+ 3 - 2
models_processing/model_koi/tf_lstm_train.py

@@ -48,9 +48,10 @@ def model_training_bp():
     try:
         # ------------ 获取数据,预处理训练数据 ------------
         train_data = get_data_from_mongo(args)
-        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data)
+        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
         # ------------ 训练模型,保存模型 ------------
         ts.opt.Model['input_size'] = train_x.shape[2]
+        ts.opt.cap = round(scaled_cap, 2)
         ts_model = ts.training([train_x, valid_x, train_y, valid_y])
 
         args['params'] = json.dumps(args)
@@ -79,7 +80,7 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    serve(app, host="0.0.0.0", port=10115, threads=4)
+    serve(app, host="0.0.0.0", port=10115)
     print("server start!")
     # args_dict = {"mongodb_database": 'realtimeDq', 'scaler_table': 'j00600_scaler', 'model_name': 'lstm1',
     # 'model_table': 'j00600_model', 'mongodb_read_table': 'j00600', 'col_time': 'dateTime',