losses.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/5/8 13:15
  4. # file: loss.py.py
  5. # author: David
  6. # company: shenyang JY
  7. import tensorflow as tf
  8. from tensorflow.keras.losses import Loss
  9. from typeguard import typechecked
  10. tf.compat.v1.set_random_seed(1234)
  11. class MSE(tf.keras.losses.Loss):
  12. """
  13. 自定义损失函数模板
  14. 功能:实现名称设置、参数保存、张量运算分离
  15. """
  16. def __init__(self,
  17. name, # 设置损失名称
  18. reduction='mean',
  19. **kwargs):
  20. super().__init__(name=name, reduction=reduction)
  21. # 可添加自定义参数(自动序列化)
  22. self.param = kwargs.get('param', 1.0)
  23. def call(self, y_true, y_pred):
  24. """核心计算逻辑(分离张量运算和非张量运算)"""
  25. # 非张量运算(预处理)
  26. coefficient = tf.constant(self.param, dtype=y_pred.dtype)
  27. # 张量运算(保持计算图兼容性)
  28. squared_difference = tf.square(y_pred - y_true)
  29. loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient
  30. return loss
  31. class RMSE(Loss):
  32. """Root Mean Squared Error 损失函数(兼容单变量/多变量回归)"""
  33. def __init__(self,
  34. name,
  35. reduction="sum_over_batch_size", # 默认自动选择 'sum_over_batch_size' (等效于 mean)
  36. **kwargs):
  37. super().__init__(name=name, reduction=reduction)
  38. def call(self, y_true, y_pred):
  39. """
  40. 计算逻辑:
  41. 1. 计算预测值与真实值的平方差
  42. 2. 沿最后一个维度(特征维度)取均值
  43. 3. 对每个样本的损失取平方根
  44. """
  45. # 确保输入张量形状兼容(如 squeeze 冗余维度)
  46. y_pred = tf.convert_to_tensor(y_pred)
  47. y_true = tf.cast(y_true, y_pred.dtype)
  48. # 计算均方误差(沿最后一个轴取平均)
  49. mse_per_sample = tf.reduce_mean(tf.square(y_pred - y_true), axis=-1)
  50. # 对每个样本的 MSE 取平方根得到 RMSE
  51. rmse_per_sample = tf.sqrt(mse_per_sample)
  52. return rmse_per_sample
  53. def get_config(self):
  54. """支持序列化配置(用于模型保存/加载)"""
  55. base_config = super().get_config()
  56. return base_config
  57. class SouthLoss(Loss):
  58. """
  59. 南网新规则损失函数(支持完整序列化)
  60. 参数:
  61. cap (float): 归一化后的装机容量(需在 [0,1] 范围内)
  62. name (str): 损失函数名称,默认为 'south_loss'
  63. reduction (str): 损失归约方式,默认为 'sum_over_batch_size'
  64. 示例:
  65. # >>> loss = SouthLoss(cap=0.5)
  66. # >>> model.compile(loss=loss, optimizer='adam')
  67. """
  68. @typechecked
  69. def __init__(self,
  70. cap: float,
  71. name: str = "south_loss",
  72. reduction: str = "sum_over_batch_size"):
  73. # 参数校验
  74. # if not 0 <= cap <= 1:
  75. # raise ValueError("cap 必须为归一化后的值且位于 [0,1] 区间")
  76. super().__init__(name=name, reduction=reduction)
  77. # 内部处理缩放逻辑(保持原始 cap 的序列化)
  78. self._raw_cap = cap # 保存原始值用于序列化
  79. self.cap = tf.constant(cap * 0.2, dtype=tf.float32) # 实际计算值
  80. def get_config(self):
  81. """获取序列化配置(保存原始 cap 值)"""
  82. config = super().get_config()
  83. config.update({
  84. "cap": self._raw_cap, # 保存未缩放前的原始值
  85. "name": self.name,
  86. "reduction": self.reduction
  87. })
  88. return config
  89. @classmethod
  90. def from_config(cls, config):
  91. """反序列化时重建实例"""
  92. return cls(
  93. cap=config["cap"],
  94. name=config["name"],
  95. reduction=config["reduction"]
  96. )
  97. def call(self, y_true, y_pred):
  98. """计算损失值(带数值稳定化)"""
  99. # 确保数据类型一致
  100. y_true = tf.cast(y_true, tf.float32)
  101. y_pred = tf.cast(y_pred, tf.float32)
  102. # 数值稳定化处理
  103. diff = y_true - y_pred
  104. delta = y_true - self.cap
  105. # 使用稳定化的 sigmoid 计算
  106. logistic_input = tf.clip_by_value(10000.0 * delta, -50.0, 50.0) # 防止梯度爆炸
  107. logistic_values = tf.sigmoid(logistic_input)
  108. # 计算基值
  109. base = logistic_values * y_true + (1 - logistic_values) * self.cap
  110. # 避免除零错误
  111. safe_base = tf.where(tf.equal(base, 0.0), 1e-7, base)
  112. # 计算损失
  113. loss = tf.reduce_mean(tf.square(diff / safe_base), axis=-1)
  114. return loss
  115. def call2(self, y_true, y_predict):
  116. y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  117. y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  118. y_true = y_true[:, 15]
  119. y_predict = y_predict[:, 15]
  120. diff = y_true - y_predict
  121. logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
  122. base = logistic_values * y_true + (1 - logistic_values) * self.cap
  123. loss = K.square(diff / base)
  124. mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01))
  125. count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
  126. safe_count = tf.maximum(count, 1)
  127. # reduce_sum_loss = tf.reduce_sum(loss, axis=-1)
  128. mean_loss = loss / safe_count
  129. return mean_loss
  130. def call1(self, y_true, y_predict):
  131. y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  132. y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  133. base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap)
  134. loss = (y_true - y_predict) / base
  135. squared_loss = tf.square(loss)
  136. mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1])
  137. return mean_squared_loss
  138. region_loss_d = {
  139. 'northeast': lambda region: RMSE(region),
  140. 'south': lambda cap, region: SouthLoss(cap, region)
  141. }
  142. # 根据地区调用对应逻辑
  143. def region_loss(opt):
  144. handler = region_loss_d.get(opt.region, 'northeast')
  145. # 判断处理类型并执行
  146. if callable(handler):
  147. # 如果是lambda或函数,直接调用
  148. if opt.region == "south": # 需要额外参数的地区
  149. return handler(opt.cap, opt.region)
  150. else:
  151. return handler(opt.region)
  152. else:
  153. raise TypeError("无效的损失函数")
  154. if __name__ == '__main__':
  155. # 测试数据
  156. y_true = tf.constant([[1.0], [2.0], [3.0]], dtype=tf.float32)
  157. y_pred = tf.constant([[1.5], [2.5], [3.5]], dtype=tf.float32)
  158. # 标准 MSE
  159. mse = tf.keras.losses.MeanSquaredError()(y_true, y_pred).numpy()
  160. # 自定义损失(权重=1时等效于MSE)
  161. custom_mse = MSE(name='test')(y_true, y_pred).numpy()
  162. print("标准 MSE:", mse) # 输出: 0.25
  163. print("自定义 MSE:", custom_mse) # 应输出: 0.25
  164. assert abs(mse - custom_mse) < 1e-6
  165. # 定义变量和优化器
  166. y_pred_var = tf.Variable([[1.5], [2.5], [3.5]], dtype=tf.float32)
  167. optimizer = tf.keras.optimizers.Adam()
  168. with tf.GradientTape() as tape:
  169. loss = MSE(name='test')(y_true, y_pred_var)
  170. grads = tape.gradient(loss, y_pred_var)
  171. # 理论梯度公式:2*(y_pred - y_true)/N (N=3)
  172. expected_grads = 2 * (y_pred_var - y_true) / 3.0
  173. print("实际梯度:\n", grads.numpy())
  174. print("理论梯度:\n", expected_grads.numpy())
  175. with tf.GradientTape() as tape:
  176. loss = tf.keras.losses.MeanSquaredError()(y_true, y_pred_var)
  177. grads = tape.gradient(loss, y_pred_var)
  178. print("实际梯度1:\n", grads.numpy())