losses.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # time: 2023/5/8 13:15
  4. # file: loss.py.py
  5. # author: David
  6. # company: shenyang JY
  7. import tensorflow as tf
  8. from tensorflow.keras.losses import Loss
  9. from models_processing.model_tf.losses_cash import *
  10. from typeguard import typechecked
  11. tf.compat.v1.set_random_seed(1234)
  12. class MSE(tf.keras.losses.Loss):
  13. """
  14. 自定义损失函数模板
  15. 功能:实现名称设置、参数保存、张量运算分离
  16. """
  17. def __init__(self,
  18. name, # 设置损失名称
  19. reduction='mean',
  20. **kwargs):
  21. super().__init__(name=name, reduction=reduction)
  22. # 可添加自定义参数(自动序列化)
  23. self.param = kwargs.get('param', 1.0)
  24. def call(self, y_true, y_pred):
  25. """核心计算逻辑(分离张量运算和非张量运算)"""
  26. # 非张量运算(预处理)
  27. coefficient = tf.constant(self.param, dtype=y_pred.dtype)
  28. # 张量运算(保持计算图兼容性)
  29. squared_difference = tf.square(y_pred - y_true)
  30. loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient
  31. return loss
  32. class MSE_ZONE(tf.keras.losses.Loss):
  33. """
  34. 自定义损失函数模板
  35. 功能:实现名称设置、参数保存、张量运算分离
  36. """
  37. def __init__(self,
  38. name, # 设置损失名称
  39. reduction='mean',
  40. **kwargs):
  41. super().__init__(name=name, reduction=reduction)
  42. # 可添加自定义参数(自动序列化)
  43. self.param = kwargs.get('param', 1.0)
  44. def call(self, y_true, y_pred):
  45. """核心计算逻辑(分离张量运算和非张量运算)"""
  46. # 非张量运算(预处理)
  47. coefficient = tf.constant(self.param, dtype=y_pred.dtype)
  48. # 张量运算(保持计算图兼容性)
  49. squared_difference = tf.square(y_pred - y_true)
  50. mse_loss = tf.reduce_mean(squared_difference, axis=-1) * coefficient
  51. sum_loss = tf.reduce_mean(tf.square(tf.reduce_sum(y_pred, axis=-1) - tf.reduce_sum(y_true, axis=-1)))
  52. loss = mse_loss + 0.2 * sum_loss
  53. return loss
  54. class RMSE(Loss):
  55. """Root Mean Squared Error 损失函数(兼容单变量/多变量回归)"""
  56. def __init__(self,
  57. name,
  58. reduction="sum_over_batch_size", # 默认自动选择 'sum_over_batch_size' (等效于 mean)
  59. **kwargs):
  60. super().__init__(name=name, reduction=reduction)
  61. def call(self, y_true, y_pred):
  62. """
  63. 计算逻辑:
  64. 1. 计算预测值与真实值的平方差
  65. 2. 沿最后一个维度(特征维度)取均值
  66. 3. 对每个样本的损失取平方根
  67. """
  68. # 确保输入张量形状兼容(如 squeeze 冗余维度)
  69. y_pred = tf.convert_to_tensor(y_pred)
  70. y_true = tf.cast(y_true, y_pred.dtype)
  71. # 计算均方误差(沿最后一个轴取平均)
  72. mse_per_sample = tf.reduce_mean(tf.square(y_pred - y_true), axis=-1)
  73. # 对每个样本的 MSE 取平方根得到 RMSE
  74. rmse_per_sample = tf.sqrt(mse_per_sample)
  75. return rmse_per_sample
  76. def get_config(self):
  77. """支持序列化配置(用于模型保存/加载)"""
  78. base_config = super().get_config()
  79. return base_config
  80. class SouthLoss(Loss):
  81. """
  82. 南网新规则损失函数(支持完整序列化)
  83. 参数:
  84. cap (float): 归一化后的装机容量(需在 [0,1] 范围内)
  85. name (str): 损失函数名称,默认为 'south_loss'
  86. reduction (str): 损失归约方式,默认为 'sum_over_batch_size'
  87. 示例:
  88. # >>> loss = SouthLoss(cap=0.5)
  89. # >>> model.compile(loss=loss, optimizer='adam')
  90. """
  91. @typechecked
  92. def __init__(self,
  93. cap: float,
  94. name: str = "south_loss",
  95. reduction: str = "sum_over_batch_size"):
  96. # 参数校验
  97. # if not 0 <= cap <= 1:
  98. # raise ValueError("cap 必须为归一化后的值且位于 [0,1] 区间")
  99. super().__init__(name=name, reduction=reduction)
  100. # 内部处理缩放逻辑(保持原始 cap 的序列化)
  101. self._raw_cap = cap # 保存原始值用于序列化
  102. self.cap = tf.constant(cap * 0.2, dtype=tf.float32) # 实际计算值
  103. def get_config(self):
  104. """获取序列化配置(保存原始 cap 值)"""
  105. config = super().get_config()
  106. config.update({
  107. "cap": self._raw_cap, # 保存未缩放前的原始值
  108. "name": self.name,
  109. "reduction": self.reduction
  110. })
  111. return config
  112. @classmethod
  113. def from_config(cls, config):
  114. """反序列化时重建实例"""
  115. return cls(
  116. cap=config["cap"],
  117. name=config["name"],
  118. reduction=config["reduction"]
  119. )
  120. def call(self, y_true, y_pred):
  121. """计算损失值(带数值稳定化)"""
  122. # 确保数据类型一致
  123. y_true = tf.cast(y_true, tf.float32)
  124. y_pred = tf.cast(y_pred, tf.float32)
  125. # 数值稳定化处理
  126. diff = y_true - y_pred
  127. delta = y_true - self.cap
  128. # 使用稳定化的 sigmoid 计算
  129. logistic_input = tf.clip_by_value(10000.0 * delta, -50.0, 50.0) # 防止梯度爆炸
  130. logistic_values = tf.sigmoid(logistic_input)
  131. # 计算基值
  132. base = logistic_values * y_true + (1 - logistic_values) * self.cap
  133. # 避免除零错误
  134. safe_base = tf.where(tf.equal(base, 0.0), 1e-7, base)
  135. # 计算损失
  136. loss = tf.reduce_mean(tf.square(diff / safe_base), axis=-1)
  137. return loss
  138. def call2(self, y_true, y_predict):
  139. y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  140. y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  141. y_true = y_true[:, 15]
  142. y_predict = y_predict[:, 15]
  143. diff = y_true - y_predict
  144. logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
  145. base = logistic_values * y_true + (1 - logistic_values) * self.cap
  146. loss = K.square(diff / base)
  147. mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01))
  148. count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
  149. safe_count = tf.maximum(count, 1)
  150. # reduce_sum_loss = tf.reduce_sum(loss, axis=-1)
  151. mean_loss = loss / safe_count
  152. return mean_loss
  153. def call1(self, y_true, y_predict):
  154. y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  155. y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
  156. base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap)
  157. loss = (y_true - y_predict) / base
  158. squared_loss = tf.square(loss)
  159. mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1])
  160. return mean_squared_loss
  161. class NorthChina(Loss):
  162. """Root Mean Squared Error 损失函数(兼容单变量/多变量回归)"""
  163. def __init__(self,
  164. name="north_china_loss",
  165. reduction="sum_over_batch_size", # 默认自动选择 'sum_over_batch_size' (等效于 mean)
  166. **kwargs):
  167. super().__init__(name=name, reduction=reduction)
  168. def call(self, y_true, y_pred):
  169. # 计算误差 e = y_true - y_pred
  170. error = y_true - y_pred
  171. abs_error = tf.abs(error)
  172. # 加上 epsilon 避免除以 0
  173. epsilon = 1e-8
  174. weight = abs_error / (tf.reduce_sum(abs_error) + epsilon)
  175. weighted_squared_error = tf.square(error) * weight
  176. loss = tf.sqrt(tf.reduce_sum(weighted_squared_error))
  177. return loss
  178. def get_config(self):
  179. """支持序列化配置(用于模型保存/加载)"""
  180. base_config = super().get_config()
  181. return base_config
  182. region_loss_d = {
  183. 'northeast': lambda region: RMSE(region),
  184. 'south': lambda cap, region: SouthLoss(cap, region),
  185. 'south_cash': lambda cap, region: SouthLossCash(cap, region),
  186. 'zone': lambda region: MSE_ZONE(region), # 分区建模损失:MSE + 分区总和一致性约束
  187. 'northchina': lambda region: NorthChina(region) #华北损失函数
  188. }
  189. # 根据地区调用对应逻辑
  190. def region_loss(opt):
  191. handler = region_loss_d.get(opt.region, 'northeast')
  192. # 判断处理类型并执行
  193. if callable(handler):
  194. # 如果是lambda或函数,直接调用
  195. if str(opt.region).startswith("south"): # 需要额外参数的地区
  196. return handler(opt.cap, opt.region)
  197. else:
  198. return handler(opt.region)
  199. else:
  200. raise TypeError("无效的损失函数")
  201. if __name__ == '__main__':
  202. # 测试数据
  203. y_true = tf.constant([[1.0], [2.0], [3.0]], dtype=tf.float32)
  204. y_pred = tf.constant([[1.5], [2.5], [3.5]], dtype=tf.float32)
  205. # 标准 MSE
  206. mse = tf.keras.losses.MeanSquaredError()(y_true, y_pred).numpy()
  207. # 自定义损失(权重=1时等效于MSE)
  208. custom_mse = NorthChina(name='test')(y_true, y_pred).numpy()
  209. print("标准 MSE:", mse) # 输出: 0.25
  210. print("自定义 MSE:", custom_mse) # 应输出: 0.25
  211. # assert abs(mse - custom_mse) < 1e-6
  212. # 定义变量和优化器
  213. y_pred_var = tf.Variable([[1.5], [2.5], [3.5]], dtype=tf.float32)
  214. optimizer = tf.keras.optimizers.Adam()
  215. with tf.GradientTape() as tape:
  216. loss = NorthChina(name='test')(y_true, y_pred_var)
  217. grads = tape.gradient(loss, y_pred_var)
  218. # 理论梯度公式:2*(y_pred - y_true)/N (N=3)
  219. expected_grads = 2 * (y_pred_var - y_true) / 3.0
  220. print("实际梯度:\n", grads.numpy())
  221. print("理论梯度:\n", expected_grads.numpy())
  222. with tf.GradientTape() as tape:
  223. loss = tf.keras.losses.MeanSquaredError()(y_true, y_pred_var)
  224. grads = tape.gradient(loss, y_pred_var)
  225. print("实际梯度1:\n", grads.numpy())