Selaa lähdekoodia

awg commit algorithm components

anweiguo 4 kuukautta sitten
vanhempi
commit
8e61bb6fdf

+ 2 - 2
evaluation_processing/analysis_report.py

@@ -225,8 +225,8 @@ def put_analysis_report_to_html(args, df_clean, df_predict, df_accuracy):
         y=df_predict[label],
         mode='lines+markers',
         name='实际功率',  # 实际功率
-        line=dict(dash='dot', width=2),  # 虚线
-        marker=dict(symbol='cross'),
+        line=dict( width=1),  # 虚线
+        marker=dict(symbol='circle'),
     ))
     # 为每个模型添加预测值和实际功率的曲线
     for model in models:

+ 68 - 0
models_processing/model_predict/model_prediction_bp.py

@@ -0,0 +1,68 @@
+from flask import Flask,request
+import time
+import logging
+import traceback
+from itertools import chain
+from common.database_dml import get_data_from_mongo,insert_data_into_mongo,get_h5_model_from_mongo,get_scaler_model_from_mongo
+from common.processing_data_common import str_to_list
+app = Flask('model_prediction_bp——service')
+
+
+# 创建时间序列数据
+def model_prediction(df,args):
+    if 'is_limit' in df.columns:
+        df = df[df['is_limit'] == False]
+    features, time_steps, col_time, model_name,col_reserve =  str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],str_to_list(args['col_reserve'])
+    feature_scaler,target_scaler = get_scaler_model_from_mongo(args)
+    df = df.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
+    scaled_features = feature_scaler.transform(df[features])
+    # X_predict, _ = create_sequences(scaled_features, [], time_steps)
+    # 加载模型时传入自定义损失函数
+    # model = load_model(f'{farmId}_model.h5', custom_objects={'rmse': rmse})
+    model = get_h5_model_from_mongo(args)
+    y_predict = list(chain.from_iterable(target_scaler.inverse_transform([model.predict(scaled_features).flatten()])))
+    result = df[-len(y_predict):]
+    result['predict'] = y_predict
+    result.loc[result['predict'] < 0, 'predict'] = 0
+    result['model'] = model_name
+    features_reserve = col_reserve + ['model', 'predict']
+    return result[set(features_reserve)]
+
+
+@app.route('/model_prediction_bp', methods=['POST'])
+def model_prediction_bp():
+    # 获取程序开始时间  
+    start_time = time.time()  
+    result = {}
+    success = 0
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        print('args',args)
+        logger.info(args)
+        power_df = get_data_from_mongo(args)
+        model = model_prediction(power_df,args)
+        insert_data_into_mongo(model,args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n","\t")
+        result['msg'] = my_exception
+    end_time = time.time() 
+   
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+
+if __name__=="__main__":  
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("model_prediction_bp log")
+    from waitress import serve
+    serve(app, host="0.0.0.0", port=10104)
+    print("server start!")
+    

+ 3 - 0
models_processing/model_predict/model_prediction_lightgbm.py

@@ -19,6 +19,9 @@ def model_prediction(df,args):
     db = client[mongodb_database]
     collection = db[mongodb_model_table]
     model_data = collection.find_one({"model_name": model_name})
+    if 'is_limit' in df.columns:
+        df = df[df['is_limit'] == False]
+
     if model_data is not None:
         model_binary = model_data['model']  # 确保这个字段是存储模型的二进制数据
         # 反序列化模型 

+ 2 - 0
models_processing/model_predict/model_prediction_lstm.py

@@ -24,6 +24,8 @@ def create_sequences(data_features,data_target,time_steps):
 
 
 def model_prediction(df,args):
+    if 'is_limit' in df.columns:
+        df = df[df['is_limit'] == False]
     features, time_steps, col_time, model_name,col_reserve =  str_to_list(args['features']), int(args['time_steps']),args['col_time'],args['model_name'],str_to_list(args['col_reserve'])
     feature_scaler,target_scaler = get_scaler_model_from_mongo(args)
     df = df.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')

+ 2 - 0
models_processing/model_predict/res_prediction.py

@@ -30,6 +30,8 @@ def model_prediction_lightgbm():
         logger.info(args)
         col_reserve = str_to_list(args['col_reserve'])
         power_df = get_data_from_mongo(args)
+        if 'is_limit' in power_df.columns:
+            power_df = power_df[power_df['is_limit'] == False]
         power_df['model'] = args['model']
         power_df['predict'] = power_df[args['col_pre']]
         features_reserve = col_reserve + ['model', 'predict']

+ 129 - 0
models_processing/model_train/model_training_bp.py

@@ -0,0 +1,129 @@
+import numpy as np
+from sklearn.model_selection import train_test_split
+from flask import Flask,request
+import time
+import traceback
+import logging
+from sklearn.preprocessing import MinMaxScaler
+from io import BytesIO
+import joblib
+from tensorflow.keras.models import Sequential
+from tensorflow.keras.layers import LSTM, Dense, Dropout
+from tensorflow.keras.callbacks import EarlyStopping, ReduceLROnPlateau
+import tensorflow as tf
+from common.database_dml import get_data_from_mongo,insert_h5_model_into_mongo
+from common.processing_data_common import missing_features,str_to_list
+import time
+import random
+import matplotlib.pyplot as plt
+app = Flask('model_training_bp——service')
+
+def rmse(y_true, y_pred):
+    return tf.math.sqrt(tf.reduce_mean(tf.square(y_true - y_pred)))
+
+def draw_loss(history):
+    #绘制训练集和验证集损失
+    plt.figure(figsize=(20, 8))
+    plt.plot(history.history['loss'], label='Training Loss')
+    plt.plot(history.history['val_loss'], label='Validation Loss')
+    plt.title('Loss Curve')
+    plt.xlabel('Epochs')
+    plt.ylabel('Loss')
+    plt.legend()
+    plt.show()
+# 创建时间序列数据
+
+def build_model(data, args):
+    sleep_time = random.uniform(1, 20)  # 生成 5 到 20 之间的随机浮动秒数
+    time.sleep(sleep_time)
+    tf.keras.backend.clear_session()  # 清除当前的图和会话
+    # 设置随机种子
+    np.random.seed(42)  # NumPy随机种子
+    tf.random.set_seed(42)  # TensorFlow随机种子
+    col_time,features,target = args['col_time'], str_to_list(args['features']),args['target']
+    if 'is_limit' in data.columns:
+        data = data[data['is_limit']==False]
+    # 清洗特征平均缺失率大于20%的天
+    data = missing_features(data, features, col_time)
+    train_data = data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
+    # 创建特征和目标的标准化器
+    feature_scaler = MinMaxScaler(feature_range=(0, 1))
+    target_scaler = MinMaxScaler(feature_range=(0, 1))
+    # 标准化特征和目标
+    scaled_features = feature_scaler.fit_transform(train_data[features])
+    scaled_target = target_scaler.fit_transform(train_data[[target]])
+    # 保存两个scaler
+    feature_scaler_bytes = BytesIO()
+    joblib.dump(feature_scaler, feature_scaler_bytes)
+    feature_scaler_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
+    target_scaler_bytes = BytesIO()
+    joblib.dump(target_scaler, target_scaler_bytes)
+    target_scaler_bytes.seek(0)
+
+    # 划分训练集和测试集
+    X_train, X_test, y_train, y_test = train_test_split(scaled_features, scaled_target, test_size=0.2, random_state=43)
+
+    # 构建 LSTM 模型
+    model = Sequential([
+        Dense(64, input_dim=X_train.shape[1], activation='relu'),  # 输入层和隐藏层,10个神经元
+        Dropout(0.2),
+        Dense(32, activation='relu'),  # 隐藏层,8个神经元
+        Dropout(0.3),  # Dropout层,30%的神经元输出会被随机丢弃
+        Dense(1, activation='linear')  # 输出层,1个神经元(用于回归任务)
+    ])
+
+    # 编译模型
+    model.compile(optimizer='adam', loss='mean_squared_error')
+    # 定义 EarlyStopping 和 ReduceLROnPlateau 回调
+    early_stopping = EarlyStopping(monitor='val_loss', patience=10, restore_best_weights=True, verbose=1)
+    reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=5, verbose=1)
+    # 训练模型
+    # 使用GPU进行训练
+    with tf.device('/GPU:1'):
+        history = model.fit(X_train, y_train,
+                        epochs=100,
+                        batch_size=32,
+                        validation_data=(X_test, y_test),
+                        verbose=2,
+                        shuffle=False,
+                        callbacks=[early_stopping, reduce_lr])
+    draw_loss(history)
+    return model,feature_scaler_bytes,target_scaler_bytes
+
+
+@app.route('/model_training_bp', methods=['POST'])
+def model_training_bp():
+    # 获取程序开始时间  
+    start_time = time.time()  
+    result = {}
+    success = 0
+    print("Program starts execution!")
+    try:
+        args = request.values.to_dict()
+        print('args',args)
+        logger.info(args)
+        power_df = get_data_from_mongo(args)
+        model,feature_scaler_bytes,target_scaler_bytes = build_model(power_df,args)
+        insert_h5_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args)
+        success = 1
+    except Exception as e:
+        my_exception = traceback.format_exc()
+        my_exception.replace("\n","\t")
+        result['msg'] = my_exception
+    end_time = time.time() 
+   
+    result['success'] = success
+    result['args'] = args
+    result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
+    result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
+    print("Program execution ends!")
+    return result
+
+
+if __name__=="__main__":  
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("model_training_bp log")
+    from waitress import serve
+    serve(app, host="0.0.0.0", port=10103,threads=4)
+    print("server start!")

+ 2 - 1
requirements.txt

@@ -15,4 +15,5 @@ joblib==1.3.2
 tensorflow==2.2.0
 matplotlib==3.5.3
 Keras==2.3.1
-protobuf==3.20.3
+protobuf==3.20.3
+APScheduler==3.10.4

+ 4 - 2
run_all.py

@@ -17,8 +17,10 @@ services = [
     ("models_processing/model_predict/model_prediction_lstm.py", 10097),
     ("post_processing/post_processing.py", 10098),
     ("evaluation_processing/analysis.py", 10099),
-    ("models_processing/model_predict/res_prediction.py", 10100)
-
+    ("data_processing/data_operation/pre_data_ftp.py", 10101),
+    ("data_processing/data_operation/data_nwp_ftp.py", 10102),
+    ("models_processing/model_train/model_training_bp.py", 10103),
+    ("models_processing/model_predict/model_prediction_bp.py", 10104),
 ]
 
 # 获取当前脚本所在的根目录