#!/usr/bin/env python # -*- coding: utf-8 -*- # time: 2023/5/8 13:15 # file: loss.py.py # author: David # company: shenyang JY from keras import backend as K import tensorflow as tf tf.compat.v1.set_random_seed(1234) class SouthLoss(tf.keras.losses.Loss): def __init__(self, opt, name='south_loss'): """ 南网新规则损失函数 :param cap:装机容量 """ super(SouthLoss, self).__init__(name=name) self.cap = opt.cap*0.2 # 没有归一化cap,必须要先进行归一化 self.opt = opt def call(self, y_true, y_predict): """ 自动调用 :param y_true: 标签 :param y_predict: 预测 :return: 损失值 """ # 计算实际和预测的差值 y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] y_true = y_true[:, 15] y_predict = y_predict[:, 15] diff = y_true - y_predict logistic_values = tf.sigmoid(10000 * (y_true - self.cap)) base = logistic_values * y_true + (1-logistic_values)*self.cap loss = K.square(diff/base) # loss = K.mean(loss, axis=-1) return loss def call1(self, y_true, y_predict): y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap) loss = (y_true - y_predict) / base squared_loss = tf.square(loss) mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1]) return mean_squared_loss class NorthEastLoss(tf.keras.losses.Loss): def __init__(self, opt, name='northeast_loss'): """ 东北新规则超短期损失函数 """ super(NorthEastLoss, self).__init__(name=name) self.opt = opt self.cap = round(opt.cap*0.1, 2) def call(self, y_true, y_predict): # 这里我们添加了一个小的 epsilon 值来避免除以 0 y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE'] mask_logical = tf.logical_and(tf.greater(y_true, self.cap), tf.greater(y_predict, self.cap)) # mask = tf.cast(~mask_logical, tf.float32) # y_true = y_true * (1 - mask) + 0 * mask # y_predict = y_predict * (1 - mask) + 0 * mask epsilon = tf.keras.backend.epsilon() y_predict_safe = y_predict + epsilon # 计算 (y_true - y_predict) / y_predict_safe difference_over_predict = tf.abs(y_predict - y_true) / tf.abs(y_predict_safe) # 将结果中大于等于 1 的部分置为 1,剩下的保留原值 masked_difference = tf.where(difference_over_predict >= 1, tf.ones_like(difference_over_predict)*1, difference_over_predict) #tf.where的操作是逐元素的,并且它不会改变张量中元素的数学性质(如可微性、可导性)。 # 这里我们先沿着特征维度求和,但你也可以选择平均(使用 tf.reduce_mean 而不是 tf.reduce_sum) count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1) sum_diff = tf.reduce_sum(masked_difference, axis=-1) # mean_loss = tf.reduce_mean(masked_difference, axis=[1]) safe_count = tf.maximum(count, 1) mean = sum_diff / safe_count return mean class NorthWestLoss(tf.keras.losses.Loss): def __init__(self, name='northwest_loss'): """ 东北新规则超短期损失函数 """ super(NorthWestLoss, self).__init__(name=name) def call(self, y_true, y_pred): # 保证预测值和真实值是浮点数 y_pred = tf.cast(y_pred, tf.float32) y_true = tf.cast(y_true, tf.float32) # 避免除零错误 epsilon = 1e-8 y_pred_adjusted = y_pred + epsilon y_true_adjusted = y_true + epsilon # 计算 |Pr - Pn| abs_diff = tf.abs(y_pred - y_true) # 计算 |Pr - Pn| 的总和 sum_abs_diff = tf.reduce_sum(abs_diff) # 计算每个差值的权重 |Pr - Pn| / sum(|Pr - Pn|) weights = abs_diff / (sum_abs_diff + epsilon) # 添加 epsilon 避免除零 # 计算 |Pr/(Pr + Pn) - 0.5| ratios = tf.abs((y_pred_adjusted / (y_pred_adjusted + y_true_adjusted)) - 0.5) # 计算最终的损失值 loss = 1.0 - 2.0 * tf.reduce_sum(ratios * weights) return loss