Explorar o código

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei hai 3 meses
pai
achega
c26b46fc89

+ 81 - 0
common/data_cleaning.py

@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/10/11 11:00
+# file: data_cleaning.py
+# author: David
+# company: shenyang JY
+import numpy as np
+np.random.seed(42)
+
+def cleaning(df, name, logger, cols=None, dup=True):
+    logger.info("开始清洗:{}……".format(name))
+    data = df.copy()
+    data = data_column_cleaning(data, logger)
+    if dup:
+        data = rm_duplicated(data, logger)
+    if cols is not None:
+        data = key_field_row_cleaning(data, cols, logger)
+    return data
+
+
+def data_column_cleaning(data, logger, clean_value=[-99.0, -99]):
+    """
+    列的清洗
+    :param data:
+    :param logger:
+    :param clean_value:
+    :return:
+    """
+    data1 = data.copy()
+    cols_pre = data.columns.to_list()
+    for val in clean_value:
+        data1 = data1.replace(val, np.nan)
+    # nan 列超过80% 删除
+    data1 = data1.dropna(axis=1, thresh=len(data) * 0.8)
+    # 删除取值全部相同的列
+    data1 = data1.loc[:, (data1 != data1.iloc[0]).any()]
+    data = data[data1.columns.tolist()]
+    cols_late = data.columns.tolist()
+    if len(cols_pre) > len(cols_late):
+        logger.info("列清洗:清洗的列有:{}".format(set(cols_pre) - set(cols_late)))
+    return data
+
+
+def key_field_row_cleaning(data, cols, logger):
+    """
+    行的重要字段清洗: 过滤含有- 99的数字,过滤空值
+    :param data:
+    :param cols: 指定的字段列表
+    :param logger:
+    :return:
+    """
+    rows_pre = len(data)
+    nan_cols = []
+    for col in cols:
+        begin = len(data)
+        if col in data.columns.tolist():
+            # data = data[~((data.loc[:, col] < 0) & (data.loc[:, col].astype(str).str.contains('99')))]
+            data = data[~(data[col] == -99)]
+            data = data[~data.loc[:, col].isnull()]
+        end = len(data)
+        if end - begin > 0:
+            nan_cols.append(col)
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        logger.info("行清洗:清洗的行数有:{},缺失的列有:{}".format(rows_pre-rows_late, ', '.join(nan_cols)))
+    return data
+
+def rm_duplicated(data, logger):
+    """
+    按照时间去重
+    :param data:
+    :param logger:
+    :return:
+    """
+    # 按照时间去重
+    rows_pre = len(data)
+    data = data.drop_duplicates(subset='C_TIME')
+    rows_late = len(data)
+    if rows_pre - rows_late > 0:
+        logger.info("时间去重的行数有:{}".format(rows_pre - rows_late))
+    return data

+ 5 - 7
data_processing/data_operation/data_nwp_ftp.py

@@ -72,6 +72,7 @@ def get_moment_next(schedule_dt=False):
         now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
     else:
         now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+    date = now.strftime('%Y%m%d')
     if now.hour == 18:
         moment = '18'
     elif now.hour > 18:
@@ -88,7 +89,7 @@ def get_moment_next(schedule_dt=False):
         moment = '00'
     else:
         moment = '06'
-    return moment
+    return date, moment
 
 def get_moment(schedule_dt=False):
     if schedule_dt:
@@ -110,7 +111,7 @@ def download_zip_files_from_ftp(moment=None):
     date = now.strftime("%Y%m%d")
     date_2 = (now - timedelta(days=2)).strftime("%Y%m%d")
     if moment is None:
-        moment = get_moment_next()
+        date, moment = get_moment_next()
     host = 'xxl'
     ftp_host, ftp_user, ftp_password, remote_dir, local_dir = ftp_params[host]['host'], ftp_params[host]['user'], ftp_params[host]['password'], ftp_params[host]['remote_dir'], ftp_params['xxl']['local_dir']
     zip_extension = f'meteoforce_{date}{str(moment)}_*.zip'
@@ -148,8 +149,8 @@ def download_zip_files_from_ftp(moment=None):
     delete_zip_files(date_2)
 
 def select_file_to_mongo(args):
-    date, moment, farmId, isDq = args['dt'], get_moment_next(args['dt']), args['farmId'], args['isDq']
-    date = datetime.datetime.strptime(date, '%Y-%m-%d %H:%M:%S').strftime("%Y%m%d")
+    date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
+    farmId, isDq = args['farmId'], args['isDq']
     csv_file_format = 'meteoforce_{}_{}_*.csv'.format(farmId, date + str(moment))
     csv_file_weather = csv_file_format.replace('*', 'weather')
     csv_file_power = csv_file_format.replace('*', 'power')
@@ -227,10 +228,7 @@ def get_nwp_from_ftp():
     args = {}
     # print("data_nwp_ftp starts execution!")
     try:
-        now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")).strftime('%Y-%m-%d %H:%M:%S')
         args = request.values.to_dict()
-        if args.get('dt') is None:
-            args['dt'] = now
         # 1. 获取参数:日期,数据源,时刻,D0-9,场站ID,存储的 mongo 和表
         # print('args', args)
         logger.info(args)

+ 5 - 6
data_processing/data_operation/pre_data_ftp.py

@@ -38,6 +38,7 @@ def get_moment_next(schedule_dt=False):
         now = datetime.datetime.strptime(str(schedule_dt), '%Y-%m-%d %H:%M:%S')
     else:
         now = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
+    date = now.strftime('%Y%m%d')
     if now.hour == 18:
         moment = '18'
     elif now.hour > 18:
@@ -54,7 +55,7 @@ def get_moment_next(schedule_dt=False):
         moment = '00'
     else:
         moment = '06'
-    return moment
+    return date, moment
 
 def zip_temp_file(df, args):
     def zip_folder(folder_path, zip_filePath):
@@ -65,15 +66,13 @@ def zip_temp_file(df, args):
                 zip_file.write(file_path, os.path.relpath(file_path, folder_path))
         zip_file.close()
     temp_dir, tem_dir_zip = tempfile.mkdtemp(dir=ftp_params['local_dir']), tempfile.mkdtemp(dir=ftp_params['local_dir'])
-    current_time = datetime.datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai"))
-    dt = current_time.strftime('%Y%m%d')
-    moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
+    date, moment = get_moment_next() if args.get('dt') is None else get_moment_next(args.get('dt'))
     modeler, model, version, farmId = ftp_params[args['user']]['modeler'], args['model'], args['version'], args['farmId']
-    csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, dt, moment)
+    csv_file = 'jy_{}.{}.{}_{}_{}{}_dq.csv'.format(modeler, model, version, farmId, date, moment)
     csv_path = os.path.join(temp_dir, farmId, csv_file)
     os.makedirs(os.path.dirname(csv_path), exist_ok=True)
     df.to_csv(csv_path, index=False)
-    zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, dt, moment)
+    zip_file = 'jy_{}.{}.{}_{}{}_dq.zip'.format(modeler, model, version, date, moment)
     zip_path = os.path.join(tem_dir_zip, zip_file)
     zip_folder(temp_dir, zip_path)
     shutil.rmtree(temp_dir)

+ 139 - 0
models_processing/losses/loss_cdq.py

@@ -0,0 +1,139 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# time: 2023/5/8 13:15
+# file: loss.py.py
+# author: David
+# company: shenyang JY
+from keras import backend as K
+import tensorflow as tf
+tf.compat.v1.set_random_seed(1234)
+
+
+class SouthLoss(tf.keras.losses.Loss):
+    def __init__(self, opt, name='south_loss'):
+        """
+        南网新规则损失函数
+        :param cap:装机容量
+        """
+        super(SouthLoss, self).__init__(name=name)
+        self.cap = opt.cap*0.2    # 没有归一化cap,必须要先进行归一化
+        self.opt = opt
+        # self.cap01 = opt.cap*0.1
+
+    def call(self, y_true, y_predict):
+        """
+        自动调用
+        :param y_true: 标签
+        :param y_predict: 预测
+        :return: 损失值
+        """
+        # 计算实际和预测的差值
+        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_true = y_true[:, 15]
+        y_predict = y_predict[:, 15]
+        diff = y_true - y_predict
+        logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
+        base = logistic_values * y_true + (1-logistic_values)*self.cap
+        loss = K.square(diff/base)
+        # loss = K.mean(loss, axis=-1)
+        return loss
+
+    def call2(self, y_true, y_predict):
+        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_true = y_true[:, 15]
+        y_predict = y_predict[:, 15]
+        diff = y_true - y_predict
+        logistic_values = tf.sigmoid(10000 * (y_true - self.cap))
+        base = logistic_values * y_true + (1 - logistic_values) * self.cap
+        loss = K.square(diff / base)
+
+        mask_logical = tf.logical_and(tf.greater(y_true, self.cap01), tf.greater(y_predict, self.cap01))
+        count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
+        safe_count = tf.maximum(count, 1)
+        # reduce_sum_loss = tf.reduce_sum(loss, axis=-1)
+        mean_loss = loss / safe_count
+        return mean_loss
+
+    def call1(self, y_true, y_predict):
+        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        base = tf.where(y_true > self.cap, y_true, tf.ones_like(y_true)*self.cap)
+        loss = (y_true - y_predict) / base
+        squared_loss = tf.square(loss)
+        mean_squared_loss = tf.reduce_mean(squared_loss, axis=[1])
+        return  mean_squared_loss
+
+
+class NorthEastLoss(tf.keras.losses.Loss):
+    def __init__(self, opt, name='northeast_loss'):
+        """
+        东北新规则超短期损失函数
+        """
+        super(NorthEastLoss, self).__init__(name=name)
+        self.opt = opt
+        self.cap = round(opt.cap*0.1, 2)
+
+    def call(self, y_true, y_predict):
+        # 这里我们添加了一个小的 epsilon 值来避免除以 0
+        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+
+        mask_logical = tf.logical_and(tf.greater(y_true, self.cap), tf.greater(y_predict, self.cap))
+        # mask = tf.cast(~mask_logical, tf.float32)
+        # y_true = y_true * (1 - mask) + 0 * mask
+        # y_predict = y_predict * (1 - mask) + 0 * mask
+
+
+        epsilon = tf.keras.backend.epsilon()
+        y_predict_safe = y_predict + epsilon
+
+        # 计算 (y_true - y_predict) / y_predict_safe
+        difference_over_predict = tf.abs(y_predict - y_true) / tf.abs(y_predict_safe)
+
+        # 将结果中大于等于 1 的部分置为 1,剩下的保留原值
+        masked_difference = tf.where(difference_over_predict >= 1, tf.ones_like(difference_over_predict)*1, difference_over_predict) #tf.where的操作是逐元素的,并且它不会改变张量中元素的数学性质(如可微性、可导性)。
+
+        # 这里我们先沿着特征维度求和,但你也可以选择平均(使用 tf.reduce_mean 而不是 tf.reduce_sum)
+        count = tf.reduce_sum(tf.cast(mask_logical, tf.float32), axis=-1)
+        sum_diff = tf.reduce_sum(masked_difference, axis=-1)
+        # mean_loss = tf.reduce_mean(masked_difference, axis=[1])
+        safe_count = tf.maximum(count, 1)
+        mean = sum_diff / safe_count
+        mean1 = tf.reduce_sum(masked_difference, axis=-1)
+        return mean
+
+
+class NorthWestLoss(tf.keras.losses.Loss):
+    def __init__(self, name='northwest_loss'):
+        """
+        东北新规则超短期损失函数
+        """
+        super(NorthWestLoss, self).__init__(name=name)
+
+    def call(self, y_true, y_pred):
+        # 保证预测值和真实值是浮点数
+        y_pred = tf.cast(y_pred, tf.float32)
+        y_true = tf.cast(y_true, tf.float32)
+
+        # 避免除零错误
+        epsilon = 1e-8
+        y_pred_adjusted = y_pred + epsilon
+        y_true_adjusted = y_true + epsilon
+
+        # 计算 |Pr - Pn|
+        abs_diff = tf.abs(y_pred - y_true)
+
+        # 计算 |Pr - Pn| 的总和
+        sum_abs_diff = tf.reduce_sum(abs_diff)
+
+        # 计算每个差值的权重 |Pr - Pn| / sum(|Pr - Pn|)
+        weights = abs_diff / (sum_abs_diff + epsilon)  # 添加 epsilon 避免除零
+
+        # 计算 |Pr/(Pr + Pn) - 0.5|
+        ratios = tf.abs((y_pred_adjusted / (y_pred_adjusted + y_true_adjusted)) - 0.5)
+
+        # 计算最终的损失值
+        loss = 1.0 - 2.0 * tf.reduce_sum(ratios * weights)
+        return loss