#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/5/8 13:15 # file: loss.py.py # author: David # company: shenyang JY import tensorflow as tf from tensorflow.keras.losses import Loss from typeguard import typechecked tf.compat.v1.set_random_seed(1234) class MSE(tf.keras.losses.Loss): """ 自定义损失函数模板 功能:实现名称设置、参数保存、张量运算分离 """ def __init__(self, name, # 设置损失名称 reduction='mean', **kwargs): super().__init__(name=name, reduction=reduction) # 可添加自定义参数(自动序列化) self.param = kwargs.get('param', 1.0) def call(self, y_true, y_pred): """核心计算逻辑(分离张量运算和非张量运算)""" # 非张量运算(预处理) coefficient = tf.constant(self.param, dtype=y_pred.dtype) # 张量运算(保持计算图兼容性) squared_difference = tf.square(y_pred - y_true) loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient return loss class RMSE(Loss): """Root Mean Squared Error 损失函数(兼容单变量/多变量回归)""" def __init__(self, name, reduction="sum_over_batch_size", # 默认自动选择 'sum_over_batch_size' (等效于 mean) **kwargs): super().__init__(name=name, reduction=reduction) def call(self, y_true, y_pred): """ 计算逻辑: 1. 计算预测值与真实值的平方差 2. 沿最后一个维度(特征维度)取均值 3. 对每个样本的损失取平方根 """ # 确保输入张量形状兼容(如 squeeze 冗余维度) y_pred = tf.convert_to_tensor(y_pred) y_true = tf.cast(y_true, y_pred.dtype) # 计算均方误差(沿最后一个轴取平均) mse_per_sample = tf.reduce_mean(tf.square(y_pred - y_true), axis=-1) # 对每个样本的 MSE 取平方根得到 RMSE rmse_per_sample = tf.sqrt(mse_per_sample) return rmse_per_sample def get_config(self): """支持序列化配置(用于模型保存/加载)""" base_config = super().get_config() return base_config class SouthLoss(Loss): """ 南网新规则损失函数(支持完整序列化) 参数: cap (float): 归一化后的装机容量(需在 [0,1] 范围内) name (str): 损失函数名称,默认为 'south_loss' reduction (str): 损失归约方式,默认为 'sum_over_batch_size' 示例: # >>> loss = SouthLoss(cap=0.5) # >>> model.compile(loss=loss, optimizer='adam') """ @typechecked def __init__(self, cap: float, name: str = "south_loss", reduction: str = "sum_over_batch_size"): # 参数校验 # if not 0 <= cap <= 1: # raise ValueError("cap 必须为归一化后的值且位于 [0,1] 区间") super().__init__(name=name, reduction=reduction) # 内部处理缩放逻辑(保持原始 cap 的序列化) self._raw_cap = cap # 保存原始值用于序列化 self.cap = tf.constant(cap * 0.2, dtype=tf.float32) # 实际计算值 def get_config(self): """获取序列化配置(保存原始 cap 值)""" config = super().get_config() config.update({ "cap": self._raw_cap, # 保存未缩放前的原始值 "name": self.name, "reduction": self.reduction }) return config @classmethod def from_config(cls, config): """反序列化时重建实例""" return cls( cap=config["cap"], name=config["name"], reduction=config["reduction"] ) def call(self, y_true, y_pred): """计算损失值(带数值稳定化)""" # 确保数据类型一致 y_true = tf.cast(y_true, tf.float32) y_pred = tf.cast(y_pred, tf.float32) # 数值稳定化处理 diff = y_true - y_pred delta = y_true - self.cap # 使用稳定化的 sigmoid 计算 logistic_input = tf.clip_by_value(10000.0 * delta, -50.0, 50.0) # 防止梯度爆炸 logistic_values = tf.sigmoid(logistic_input) # 计算基值 base = logistic_values * y_true + (1 - logistic_values) * self.cap # 避免除零错误 safe_base = tf.where(tf.equal(base, 0.0), 1e-7, base) # 计算损失 loss = tf.reduce_mean(tf.square(diff / safe_base), axis=-1) return loss def call2(self, y_true, y_predict): y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] y_true = y_true[:, 15] y_predict = y_predict[:, 15] diff = y_true - y_predict logistic_values = tf.sigmoid(10000 * (y_true - self.cap)) base = logistic_values * y_true + (1 - logistic_values) * self.cap loss = K.square(diff / base) mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01)) count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1) safe_count = tf.maximum(count, 1) # reduce_sum_loss = tf.reduce_sum(loss, axis=-1) mean_loss = loss / safe_count return mean_loss def call1(self, y_true, y_predict): y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap) loss = (y_true - y_predict) / base squared_loss = tf.square(loss) mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1]) return mean_squared_loss region_loss_d = { 'northeast': lambda region: RMSE(region), 'south': lambda cap, region: SouthLoss(cap, region) } # 根据地区调用对应逻辑 def region_loss(opt): handler = region_loss_d.get(opt.region, 'northeast') # 判断处理类型并执行 if callable(handler): # 如果是lambda或函数,直接调用 if opt.region == "south": # 需要额外参数的地区 return handler(opt.cap, opt.region) else: return handler(opt.region) else: raise TypeError("无效的损失函数") if __name__ == '__main__': # 测试数据 y_true = tf.constant([[1.0], [2.0], [3.0]], dtype=tf.float32) y_pred = tf.constant([[1.5], [2.5], [3.5]], dtype=tf.float32) # 标准 MSE mse = tf.keras.losses.MeanSquaredError()(y_true, y_pred).numpy() # 自定义损失(权重=1时等效于MSE) custom_mse = MSE(name='test')(y_true, y_pred).numpy() print("标准 MSE:", mse) # 输出: 0.25 print("自定义 MSE:", custom_mse) # 应输出: 0.25 assert abs(mse - custom_mse) < 1e-6 # 定义变量和优化器 y_pred_var = tf.Variable([[1.5], [2.5], [3.5]], dtype=tf.float32) optimizer = tf.keras.optimizers.Adam() with tf.GradientTape() as tape: loss = MSE(name='test')(y_true, y_pred_var) grads = tape.gradient(loss, y_pred_var) # 理论梯度公式:2*(y_pred - y_true)/N (N=3) expected_grads = 2 * (y_pred_var - y_true) / 3.0 print("实际梯度:\n", grads.numpy()) print("理论梯度:\n", expected_grads.numpy()) with tf.GradientTape() as tape: loss = tf.keras.losses.MeanSquaredError()(y_true, y_pred_var) grads = tape.gradient(loss, y_pred_var) print("实际梯度1:\n", grads.numpy())