123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- # time: 2023/5/8 13:15
- # file: loss.py.py
- # author: David
- # company: shenyang JY
- import tensorflow as tf
- from tensorflow.keras.losses import Loss
- from typeguard import typechecked
- tf.compat.v1.set_random_seed(1234)
- class MSE(tf.keras.losses.Loss):
- """
- 自定义损失函数模板
- 功能:实现名称设置、参数保存、张量运算分离
- """
- def __init__(self,
- name, # 设置损失名称
- reduction='mean',
- **kwargs):
- super().__init__(name=name, reduction=reduction)
- # 可添加自定义参数(自动序列化)
- self.param = kwargs.get('param', 1.0)
- def call(self, y_true, y_pred):
- """核心计算逻辑(分离张量运算和非张量运算)"""
- # 非张量运算(预处理)
- coefficient = tf.constant(self.param, dtype=y_pred.dtype)
- # 张量运算(保持计算图兼容性)
- squared_difference = tf.square(y_pred - y_true)
- loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient
- return loss
- class RMSE(Loss):
- """Root Mean Squared Error 损失函数(兼容单变量/多变量回归)"""
- def __init__(self,
- name,
- reduction="sum_over_batch_size", # 默认自动选择 'sum_over_batch_size' (等效于 mean)
- **kwargs):
- super().__init__(name=name, reduction=reduction)
- def call(self, y_true, y_pred):
- """
- 计算逻辑:
- 1. 计算预测值与真实值的平方差
- 2. 沿最后一个维度(特征维度)取均值
- 3. 对每个样本的损失取平方根
- """
- # 确保输入张量形状兼容(如 squeeze 冗余维度)
- y_pred = tf.convert_to_tensor(y_pred)
- y_true = tf.cast(y_true, y_pred.dtype)
- # 计算均方误差(沿最后一个轴取平均)
- mse_per_sample = tf.reduce_mean(tf.square(y_pred - y_true), axis=-1)
- # 对每个样本的 MSE 取平方根得到 RMSE
- rmse_per_sample = tf.sqrt(mse_per_sample)
- return rmse_per_sample
- def get_config(self):
- """支持序列化配置(用于模型保存/加载)"""
- base_config = super().get_config()
- return base_config
- class SouthLoss(Loss):
- """
- 南网新规则损失函数(支持完整序列化)
- 参数:
- cap (float): 归一化后的装机容量(需在 [0,1] 范围内)
- name (str): 损失函数名称,默认为 'south_loss'
- reduction (str): 损失归约方式,默认为 'sum_over_batch_size'
- 示例:
- # >>> loss = SouthLoss(cap=0.5)
- # >>> model.compile(loss=loss, optimizer='adam')
- """
- @typechecked
- def __init__(self,
- cap: float,
- name: str = "south_loss",
- reduction: str = "sum_over_batch_size"):
- # 参数校验
- # if not 0 <= cap <= 1:
- # raise ValueError("cap 必须为归一化后的值且位于 [0,1] 区间")
- super().__init__(name=name, reduction=reduction)
- # 内部处理缩放逻辑(保持原始 cap 的序列化)
- self._raw_cap = cap # 保存原始值用于序列化
- self.cap = tf.constant(cap * 0.2, dtype=tf.float32) # 实际计算值
- def get_config(self):
- """获取序列化配置(保存原始 cap 值)"""
- config = super().get_config()
- config.update({
- "cap": self._raw_cap, # 保存未缩放前的原始值
- "name": self.name,
- "reduction": self.reduction
- })
- return config
- @classmethod
- def from_config(cls, config):
- """反序列化时重建实例"""
- return cls(
- cap=config["cap"],
- name=config["name"],
- reduction=config["reduction"]
- )
- def call(self, y_true, y_pred):
- """计算损失值(带数值稳定化)"""
- # 确保数据类型一致
- y_true = tf.cast(y_true, tf.float32)
- y_pred = tf.cast(y_pred, tf.float32)
- # 数值稳定化处理
- diff = y_true - y_pred
- delta = y_true - self.cap
- # 使用稳定化的 sigmoid 计算
- logistic_input = tf.clip_by_value(10000.0 * delta, -50.0, 50.0) # 防止梯度爆炸
- logistic_values = tf.sigmoid(logistic_input)
- # 计算基值
- base = logistic_values * y_true + (1 - logistic_values) * self.cap
- # 避免除零错误
- safe_base = tf.where(tf.equal(base, 0.0), 1e-7, base)
- # 计算损失
- loss = tf.reduce_mean(tf.square(diff / safe_base), axis=-1)
- return loss
- def call2(self, y_true, y_predict):
- y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
- y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
- y_true = y_true[:, 15]
- y_predict = y_predict[:, 15]
- diff = y_true - y_predict
- logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
- base = logistic_values * y_true + (1 - logistic_values) * self.cap
- loss = K.square(diff / base)
- mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01))
- count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
- safe_count = tf.maximum(count, 1)
- # reduce_sum_loss = tf.reduce_sum(loss, axis=-1)
- mean_loss = loss / safe_count
- return mean_loss
- def call1(self, y_true, y_predict):
- y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
- y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
- base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap)
- loss = (y_true - y_predict) / base
- squared_loss = tf.square(loss)
- mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1])
- return mean_squared_loss
- region_loss_d = {
- 'northeast': lambda region: RMSE(region),
- 'south': lambda cap, region: SouthLoss(cap, region)
- }
- # 根据地区调用对应逻辑
- def region_loss(opt):
- handler = region_loss_d.get(opt.region, 'northeast')
- # 判断处理类型并执行
- if callable(handler):
- # 如果是lambda或函数,直接调用
- if opt.region == "south": # 需要额外参数的地区
- return handler(opt.cap, opt.region)
- else:
- return handler(opt.region)
- else:
- raise TypeError("无效的损失函数")
- if __name__ == '__main__':
- # 测试数据
- y_true = tf.constant([[1.0], [2.0], [3.0]], dtype=tf.float32)
- y_pred = tf.constant([[1.5], [2.5], [3.5]], dtype=tf.float32)
- # 标准 MSE
- mse = tf.keras.losses.MeanSquaredError()(y_true, y_pred).numpy()
- # 自定义损失(权重=1时等效于MSE)
- custom_mse = MSE(name='test')(y_true, y_pred).numpy()
- print("标准 MSE:", mse) # 输出: 0.25
- print("自定义 MSE:", custom_mse) # 应输出: 0.25
- assert abs(mse - custom_mse) < 1e-6
- # 定义变量和优化器
- y_pred_var = tf.Variable([[1.5], [2.5], [3.5]], dtype=tf.float32)
- optimizer = tf.keras.optimizers.Adam()
- with tf.GradientTape() as tape:
- loss = MSE(name='test')(y_true, y_pred_var)
- grads = tape.gradient(loss, y_pred_var)
- # 理论梯度公式:2*(y_pred - y_true)/N (N=3)
- expected_grads = 2 * (y_pred_var - y_true) / 3.0
- print("实际梯度:\n", grads.numpy())
- print("理论梯度:\n", expected_grads.numpy())
- with tf.GradientTape() as tape:
- loss = tf.keras.losses.MeanSquaredError()(y_true, y_pred_var)
- grads = tape.gradient(loss, y_pred_var)
- print("实际梯度1:\n", grads.numpy())
|