losses.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/5/8 13:15
  4. # file: loss.py.py
  5. # author: David
  6. # company: shenyang JY
  7. import tensorflow as tf
  8. from tensorflow.keras.losses import Loss
  9. from typeguard import typechecked
  10. tf.compat.v1.set_random_seed(1234)
  11. class MSE(tf.keras.losses.Loss):
  12. """
  13. 自定义损失函数模板
  14. 功能:实现名称设置、参数保存、张量运算分离
  15. """
  16. def __init__(self,
  17. name, # 设置损失名称
  18. reduction='mean',
  19. **kwargs):
  20. super().__init__(name=name, reduction=reduction)
  21. # 可添加自定义参数(自动序列化)
  22. self.param = kwargs.get('param', 1.0)
  23. def call(self, y_true, y_pred):
  24. """核心计算逻辑(分离张量运算和非张量运算)"""
  25. # 非张量运算(预处理)
  26. coefficient = tf.constant(self.param, dtype=y_pred.dtype)
  27. # 张量运算(保持计算图兼容性)
  28. squared_difference = tf.square(y_pred - y_true)
  29. loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient
  30. return loss
  31. class MSE_ZONE(tf.keras.losses.Loss):
  32. """
  33. 自定义损失函数模板
  34. 功能:实现名称设置、参数保存、张量运算分离
  35. """
  36. def __init__(self,
  37. name, # 设置损失名称
  38. reduction='mean',
  39. **kwargs):
  40. super().__init__(name=name, reduction=reduction)
  41. # 可添加自定义参数(自动序列化)
  42. self.param = kwargs.get('param', 1.0)
  43. def call(self, y_true, y_pred):
  44. """核心计算逻辑(分离张量运算和非张量运算)"""
  45. # 非张量运算(预处理)
  46. coefficient = tf.constant(self.param, dtype=y_pred.dtype)
  47. # 张量运算(保持计算图兼容性)
  48. squared_difference = tf.square(y_pred - y_true)
  49. mse_loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient
  50. sum_loss = tf.reduce_mean(tf.square(tf.reduce_sum(y_pred, axis=-1) - tf.reduce_sum(y_true, axis=-1)))
  51. loss = mse_loss + 0.2 * sum_loss
  52. return loss
  53. class RMSE(Loss):
  54. """Root Mean Squared Error 损失函数(兼容单变量/多变量回归)"""
  55. def __init__(self,
  56. name,
  57. reduction="sum_over_batch_size", # 默认自动选择 'sum_over_batch_size' (等效于 mean)
  58. **kwargs):
  59. super().__init__(name=name, reduction=reduction)
  60. def call(self, y_true, y_pred):
  61. """
  62. 计算逻辑:
  63. 1. 计算预测值与真实值的平方差
  64. 2. 沿最后一个维度(特征维度)取均值
  65. 3. 对每个样本的损失取平方根
  66. """
  67. # 确保输入张量形状兼容(如 squeeze 冗余维度)
  68. y_pred = tf.convert_to_tensor(y_pred)
  69. y_true = tf.cast(y_true, y_pred.dtype)
  70. # 计算均方误差(沿最后一个轴取平均)
  71. mse_per_sample = tf.reduce_mean(tf.square(y_pred - y_true), axis=-1)
  72. # 对每个样本的 MSE 取平方根得到 RMSE
  73. rmse_per_sample = tf.sqrt(mse_per_sample)
  74. return rmse_per_sample
  75. def get_config(self):
  76. """支持序列化配置(用于模型保存/加载)"""
  77. base_config = super().get_config()
  78. return base_config
  79. class SouthLoss(Loss):
  80. """
  81. 南网新规则损失函数(支持完整序列化)
  82. 参数:
  83. cap (float): 归一化后的装机容量(需在 [0,1] 范围内)
  84. name (str): 损失函数名称,默认为 'south_loss'
  85. reduction (str): 损失归约方式,默认为 'sum_over_batch_size'
  86. 示例:
  87. # >>> loss = SouthLoss(cap=0.5)
  88. # >>> model.compile(loss=loss, optimizer='adam')
  89. """
  90. @typechecked
  91. def __init__(self,
  92. cap: float,
  93. name: str = "south_loss",
  94. reduction: str = "sum_over_batch_size"):
  95. # 参数校验
  96. # if not 0 <= cap <= 1:
  97. # raise ValueError("cap 必须为归一化后的值且位于 [0,1] 区间")
  98. super().__init__(name=name, reduction=reduction)
  99. # 内部处理缩放逻辑(保持原始 cap 的序列化)
  100. self._raw_cap = cap # 保存原始值用于序列化
  101. self.cap = tf.constant(cap * 0.2, dtype=tf.float32) # 实际计算值
  102. def get_config(self):
  103. """获取序列化配置(保存原始 cap 值)"""
  104. config = super().get_config()
  105. config.update({
  106. "cap": self._raw_cap, # 保存未缩放前的原始值
  107. "name": self.name,
  108. "reduction": self.reduction
  109. })
  110. return config
  111. @classmethod
  112. def from_config(cls, config):
  113. """反序列化时重建实例"""
  114. return cls(
  115. cap=config["cap"],
  116. name=config["name"],
  117. reduction=config["reduction"]
  118. )
  119. def call(self, y_true, y_pred):
  120. """计算损失值(带数值稳定化)"""
  121. # 确保数据类型一致
  122. y_true = tf.cast(y_true, tf.float32)
  123. y_pred = tf.cast(y_pred, tf.float32)
  124. # 数值稳定化处理
  125. diff = y_true - y_pred
  126. delta = y_true - self.cap
  127. # 使用稳定化的 sigmoid 计算
  128. logistic_input = tf.clip_by_value(10000.0 * delta, -50.0, 50.0) # 防止梯度爆炸
  129. logistic_values = tf.sigmoid(logistic_input)
  130. # 计算基值
  131. base = logistic_values * y_true + (1 - logistic_values) * self.cap
  132. # 避免除零错误
  133. safe_base = tf.where(tf.equal(base, 0.0), 1e-7, base)
  134. # 计算损失
  135. loss = tf.reduce_mean(tf.square(diff / safe_base), axis=-1)
  136. return loss
  137. def call2(self, y_true, y_predict):
  138. y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  139. y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  140. y_true = y_true[:, 15]
  141. y_predict = y_predict[:, 15]
  142. diff = y_true - y_predict
  143. logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
  144. base = logistic_values * y_true + (1 - logistic_values) * self.cap
  145. loss = K.square(diff / base)
  146. mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01))
  147. count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
  148. safe_count = tf.maximum(count, 1)
  149. # reduce_sum_loss = tf.reduce_sum(loss, axis=-1)
  150. mean_loss = loss / safe_count
  151. return mean_loss
  152. def call1(self, y_true, y_predict):
  153. y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  154. y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  155. base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap)
  156. loss = (y_true - y_predict) / base
  157. squared_loss = tf.square(loss)
  158. mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1])
  159. return mean_squared_loss
  160. region_loss_d = {
  161. 'northeast': lambda region: RMSE(region),
  162. 'south': lambda cap, region: SouthLoss(cap, region),
  163. 'zone': lambda region: MSE_ZONE(region) # 分区建模损失:MSE + 分区总和一致性约束
  164. }
  165. # 根据地区调用对应逻辑
  166. def region_loss(opt):
  167. handler = region_loss_d.get(opt.region, 'northeast')
  168. # 判断处理类型并执行
  169. if callable(handler):
  170. # 如果是lambda或函数,直接调用
  171. if opt.region == "south": # 需要额外参数的地区
  172. return handler(opt.cap, opt.region)
  173. else:
  174. return handler(opt.region)
  175. else:
  176. raise TypeError("无效的损失函数")
  177. if __name__ == '__main__':
  178. # 测试数据
  179. y_true = tf.constant([[1.0], [2.0], [3.0]], dtype=tf.float32)
  180. y_pred = tf.constant([[1.5], [2.5], [3.5]], dtype=tf.float32)
  181. # 标准 MSE
  182. mse = tf.keras.losses.MeanSquaredError()(y_true, y_pred).numpy()
  183. # 自定义损失(权重=1时等效于MSE)
  184. custom_mse = MSE(name='test')(y_true, y_pred).numpy()
  185. print("标准 MSE:", mse) # 输出: 0.25
  186. print("自定义 MSE:", custom_mse) # 应输出: 0.25
  187. assert abs(mse - custom_mse) < 1e-6
  188. # 定义变量和优化器
  189. y_pred_var = tf.Variable([[1.5], [2.5], [3.5]], dtype=tf.float32)
  190. optimizer = tf.keras.optimizers.Adam()
  191. with tf.GradientTape() as tape:
  192. loss = MSE(name='test')(y_true, y_pred_var)
  193. grads = tape.gradient(loss, y_pred_var)
  194. # 理论梯度公式:2*(y_pred - y_true)/N (N=3)
  195. expected_grads = 2 * (y_pred_var - y_true) / 3.0
  196. print("实际梯度:\n", grads.numpy())
  197. print("理论梯度:\n", expected_grads.numpy())
  198. with tf.GradientTape() as tape:
  199. loss = tf.keras.losses.MeanSquaredError()(y_true, y_pred_var)
  200. grads = tape.gradient(loss, y_pred_var)
  201. print("实际梯度1:\n", grads.numpy())