瀏覽代碼

Merge branch 'dev_awg' of anweiguo/algorithm_platform into dev_david

liudawei 3 月之前
父節點
當前提交
6ae27b1ddf

+ 22 - 0
common/alert.py

@@ -0,0 +1,22 @@
+import requests
+import json
+
+# Webhook URL
+url = 'https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=553f02de-0ef4-49ce-8d68-2489c032d42f'
+def send_message(componment, farmId, text):
+    # 设置请求头
+    headers = {'Content-Type': 'application/json'}
+    # 设置消息内容
+    data = {
+        "msgtype": "markdown",
+        "markdown": {
+            "content": f"""<font color=\'warning\'>告警</font>
+            >组件:<font color=\'comment\'>{componment}</font>
+            >场站:<font color=\'comment\'>{farmId}</font>
+            >类型:<font color=\'comment\'>{text}</font>"""
+        }
+    }
+    # 发送POST请求
+    response = requests.post(url, headers=headers, data=json.dumps(data))
+    # 输出响应内容
+    print(response.text)

+ 16 - 1
common/processing_data_common.py

@@ -1,4 +1,6 @@
 import random
+from datetime import date, timedelta
+
 def str_to_list(arg):
     if arg == '':
         return []
@@ -6,7 +8,6 @@ def str_to_list(arg):
         return arg.split(',')
 
 
-
 # 随机生成唯一颜色
 def generate_unique_colors(num_colors):
     generated_colors = set()
@@ -30,3 +31,17 @@ def missing_features(df, features, col_time, threshold=0.2):
     df = df[~df['day'].isin(days_with_high_missing)]
     print("**********删除后维度", df.shape)
     return df.drop('day',axis=1)
+
+
+def check_nwp_data(nwp_df,features):
+    tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
+    if ~all(item in nwp_df.columns for item in features):
+        diff = set(features)-set(nwp_df.columns)
+        message = f"NWP特征列缺失!features:{diff}"
+    #判断日前短期NWP是否缺数据
+    elif  len(nwp_df[nwp_df['date_time'].contains(tomorrow)])<96:
+        message = "日前数据记录缺失,不足96条!"
+    else:
+        message=''
+    return message
+

+ 1 - 1
data_processing/processing_limit_power/processing_limit_power_by_statistics_light.py

@@ -28,7 +28,7 @@ def light_statistics_judgement(df_power,args):
     # 提取辐射度和实际功率
     df_power[col_radiance] = df_power[col_radiance].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128)  else  float(x) if isinstance(x, numbers.Number) else np.nan)
     df_power[col_power] = df_power[col_power].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128)  else  float(x) if isinstance(x, numbers.Number) else np.nan)
-    df_power = df_power[(~pd.isna(df_power[col_radiance]))&(~pd.isna(df_power[col_power]))&(~((df_power[col_radiance]<=0)&(df_power[col_power]>0)))] 
+    df_power = df_power[(~pd.isna(df_power[col_radiance]))&(~pd.isna(df_power[col_power]))&(~((df_power[col_radiance]<=0)&(df_power[col_power]>0)))&(~((df_power[col_radiance]>0)&(df_power[col_power]<=0)))]
     
     X = df_power[[col_radiance]].values
     y = df_power[col_power].values

+ 63 - 2
models_processing/model_predict/model_prediction_lightgbm.py

@@ -6,6 +6,10 @@ import time
 import logging
 import traceback
 from common.database_dml import get_data_from_mongo,insert_data_into_mongo
+from common.alert import send_message
+from datetime import datetime, timedelta
+import pytz
+from pytz import timezone
 app = Flask('model_prediction_lightgbm——service')
 def str_to_list(arg):
     if arg == '':
@@ -13,6 +17,59 @@ def str_to_list(arg):
     else:
         return arg.split(',')
 
+
+def forecast_data_distribution(pre_data,args):
+    col_time = args['col_time']
+
+    # tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
+    tomorrow = (datetime.now(pytz.utc).astimezone(timezone("Asia/Shanghai")) + timedelta(days=1)).strftime('%Y-%m-%d')
+    field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure'}
+    # 根据字段映射重命名列
+    pre_data = pre_data.rename(columns=field_mapping)
+
+    if len(pre_data)==0:
+        send_message('lightgbm预测组件', args['farmId'], '请注意:获取NWP数据为空,预测文件无法生成!')
+        result = pd.DataFrame({'farm_id':[], col_time:[], 'power_forecast':[]})
+
+    elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96:
+        send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
+        start_time = pre_data[col_time].min()
+        end_time = pre_data[col_time].max()
+        date_range = pd.date_range(start=start_time, end=end_time, freq='15T').strftime('%Y-%m-%d %H:%M:%S').tolist()
+        df_date = pd.DataFrame({col_time:date_range})
+        result = pd.merge(df_date,pre_data,how='left',on=col_time).sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
+        result = result[['farm_id', 'date_time', 'power_forecast']]
+    else:
+        df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
+        mongodb_connection, mongodb_database, mongodb_model_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \
+        args['mongodb_database'], args['mongodb_model_table'], args['model_name']
+        client = MongoClient(mongodb_connection)
+        db = client[mongodb_database]
+        collection = db[mongodb_model_table]
+        model_data = collection.find_one({"model_name": model_name})
+        if model_data is not None:
+            model_binary = model_data['model']  # 确保这个字段是存储模型的二进制数据
+            # 反序列化模型
+            model = pickle.loads(model_binary)
+            diff = set(model.feature_name()) - set(pre_data.columns)
+            if len(diff) > 0:
+                send_message('lightgbm预测组件', args['farmId'], f'NWP特征列缺失,使用DQ代替!features:{diff}')
+                result = pre_data[['farm_id', 'date_time', 'power_forecast']]
+            else:
+                df['power_forecast'] = model.predict(df[model.feature_name()])
+                df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0
+                # 添加小时列 把光夜间置为0
+                df["hour"] = pd.to_datetime(df["date_time"]).dt.hour
+                df.loc[(df["hour"] >= 20) | (df["hour"] < 6), 'power_forecast'] = 0
+                print("model predict result  successfully!")
+                result = df[['farm_id', 'date_time', 'power_forecast']]
+        else:
+            send_message('lightgbm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
+            result = pre_data[['farm_id', 'date_time', 'power_forecast']]
+    result['power_forecast'] = round(result['power_forecast'],2)
+    return result
+
+
 def model_prediction(df,args):
     mongodb_connection,mongodb_database,mongodb_model_table,model_name,col_reserve = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_model_table'],args['model_name'],str_to_list(args['col_reserve'])
     client = MongoClient(mongodb_connection)
@@ -45,9 +102,13 @@ def model_prediction_lightgbm():
         args = request.values.to_dict()
         print('args',args)
         logger.info(args)
+        forecast_file = int(args['forecast_file'])
         power_df = get_data_from_mongo(args)
-        model = model_prediction(power_df,args)
-        insert_data_into_mongo(model,args)
+        if forecast_file == 1:
+            predict_data = forecast_data_distribution(power_df, args)
+        else:
+            predict_data = model_prediction(power_df, args)
+        insert_data_into_mongo(predict_data,args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()

+ 43 - 2
models_processing/model_predict/model_prediction_lstm.py

@@ -6,6 +6,9 @@ import numpy as np
 from itertools import chain
 from common.database_dml import get_data_from_mongo,insert_data_into_mongo,get_h5_model_from_mongo,get_scaler_model_from_mongo
 from common.processing_data_common import str_to_list
+from common.alert import send_message
+from datetime import date, timedelta
+import pandas as pd
 app = Flask('model_prediction_lstm——service')
 
 
@@ -23,6 +26,40 @@ def create_sequences(data_features,data_target,time_steps):
         return np.array(X), np.array(y)
 
 
+def forecast_data_distribution(pre_data,args):
+    features, time_steps, col_time, model_name = str_to_list(args['features']), int(args['time_steps']), \
+        args['col_time'], args['model_name'],
+    feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
+    tomorrow = (date.today() + timedelta(days=1)).strftime('%Y-%m-%d')
+    field_mapping = {'clearsky_ghi': 'clearskyGhi', 'dni_calcd': 'dniCalcd','surface_pressure':'surfacePressure',}
+    # 根据字段映射重命名列
+    pre_data = pre_data.rename(columns=field_mapping)
+    diff = set(features) - set(pre_data.columns)
+    if len(pre_data)==0:
+        send_message('lstm预测组件', args['farmId'], '请注意:获取NWP数据为空,预测文件无法生成!')
+        result = pd.DataFrame({col_time:[],'farm_id':[],'power_forecast':[]})
+    elif len(diff)>0:
+        send_message('lstm预测组件', args['farmId'], f'NWP特征列缺失!features:{diff}')
+        result = pre_data[['date_time', 'farm_id', 'power_forecast']]
+    elif len(pre_data[pre_data[col_time].str.contains(tomorrow)])<96:
+        send_message('lstm预测组件', args['farmId'], "日前数据记录缺失,不足96条,用DQ代替并补值!")
+        start_time = pre_data[col_time].min()
+        end_time = pre_data[col_time].max()
+        date_range = pd.date_range(start=start_time, end=end_time, freq='15T').strftime('%Y-%m-%d %H:%M:%S').tolist()
+        df_date = pd.DataFrame({col_time:date_range})
+        result = pd.merge(df_date,pre_data,how='left',on=col_time).sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
+        result = result[['date_time', 'farm_id', 'power_forecast']]
+    else:
+        df = pre_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
+        scaled_features = feature_scaler.transform(df[features])
+        X_predict, _ = create_sequences(scaled_features, [], time_steps)
+        model = get_h5_model_from_mongo(args)
+        y_predict = list(chain.from_iterable(target_scaler.inverse_transform([model.predict(X_predict).flatten()])))
+        result = df[-len(y_predict):]
+        result['power_forecast'] = y_predict
+        result.loc[result['power_forecast'] < 0, 'power_forecast'] = 0
+    return result[['date_time','farm_id','power_forecast']]
+
 def model_prediction(df,args):
     if 'is_limit' in df.columns:
         df = df[df['is_limit'] == False]
@@ -54,9 +91,13 @@ def model_prediction_lstm():
         args = request.values.to_dict()
         print('args',args)
         logger.info(args)
+        forecast_file = int(args['forecast_file'])
         power_df = get_data_from_mongo(args)
-        model = model_prediction(power_df,args)
-        insert_data_into_mongo(model,args)
+        if forecast_file == 1:
+            predict_data = forecast_data_distribution(power_df,args)
+        else:
+            predict_data = model_prediction(power_df,args)
+        insert_data_into_mongo(predict_data,args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()

+ 1 - 1
models_processing/model_predict/res_prediction.py

@@ -57,5 +57,5 @@ if __name__ == "__main__":
     logger = logging.getLogger("res_prediction log")
     from waitress import serve
 
-    serve(app, host="0.0.0.0", port=10100)
+    serve(app, host="0.0.0.0", port=10105)
     print("server start!")

+ 1 - 0
run_all.py

@@ -17,6 +17,7 @@ services = [
     ("models_processing/model_predict/model_prediction_lstm.py", 10097),
     ("post_processing/post_processing.py", 10098),
     ("evaluation_processing/analysis.py", 10099),
+    ("models_processing/model_predict/res_prediction.py", 10105),
     ("data_processing/data_operation/pre_data_ftp.py", 10101),
     ("data_processing/data_operation/data_nwp_ftp.py", 10102),
     ("models_processing/model_train/model_training_bp.py", 10103),