Parcourir la source

fix 修改日志和无权重参数报错

hzh il y a 2 semaines
Parent
commit
e626ddc6b6

+ 25 - 0
common/log_utils.py

@@ -0,0 +1,25 @@
+# log_utils.py
+import logging
+from io import StringIO
+from flask import g
+
+
+def init_request_logging(logger):
+    # 创建内存缓冲区
+    g.log_stream = StringIO()
+
+    # 创建自定义日志处理器
+    handler = logging.StreamHandler(g.log_stream)
+    handler.setFormatter(logging.Formatter(
+        '%(asctime)s [%(levelname)s] %(message)s'
+    ))
+
+    # 临时添加到logger
+    g.log_handler = handler
+    logger.addHandler(handler)
+
+
+def teardown_request_logging(response, logger):
+    if hasattr(g, 'log_handler'):
+        logger.removeHandler(g.log_handler)
+    return response

+ 18 - 7
models_processing/model_predict/model_prediction_ml.py

@@ -1,7 +1,7 @@
 import pandas as pd
 from pymongo import MongoClient
 import pickle
-from flask import Flask, request
+from flask import Flask, request, g
 import time
 import logging
 import traceback
@@ -10,11 +10,20 @@ from common.alert import send_message
 from datetime import datetime, timedelta
 import pytz
 from pytz import timezone
-
+from common.log_utils import init_request_logging, teardown_request_logging
 from common.processing_data_common import get_xxl_dq
 
 app = Flask('model_prediction_ml——service')
 
+@app.before_request
+def setup_logging():
+    init_request_logging(logger)
+
+# 请求后清理日志处理器
+@app.after_request
+def teardown_logging(response):
+    return teardown_request_logging(response, logger)
+
 def str_to_list(arg):
     if arg == '':
         return []
@@ -40,6 +49,7 @@ def forecast_data_distribution(pre_data, args):
     pre_data = pre_data.rename(columns=field_mapping)
 
     if len(pre_data) == 0:
+        logging.info("nwp dataframe is empty")
         send_message('lightgbm预测组件', farm_id, '请注意:获取NWP数据为空,预测文件无法生成!')
         result = get_xxl_dq(farm_id, dt)
 
@@ -54,7 +64,7 @@ def forecast_data_distribution(pre_data, args):
         db = client[mongodb_database]
         collection = db[mongodb_model_table]
         model_data = collection.find_one({"model_name": model_name})
-        print(model_data.keys())
+        logger.info(f"use {model_data['model_name']} features: {model_data['features']}")
         if model_data is not None:
             model_binary = model_data['model']  # 确保这个字段是存储模型的二进制数据
             # 反序列化模型
@@ -70,7 +80,7 @@ def forecast_data_distribution(pre_data, args):
             else:
                 df['power_forecast'] = model.predict(df[features])
                 df.loc[df['power_forecast'] < 0, 'power_forecast'] = 0
-                print("model predict result  successfully!")
+                logger.info("model predict result successfully!")
                 if 'farm_id' not in df.columns:
                     df['farm_id'] = farm_id
                 result = df[['farm_id', 'date_time', 'power_forecast']]
@@ -91,6 +101,7 @@ def model_prediction(df, args):
     if 'is_limit' in df.columns:
         df = df[df['is_limit'] == False]
 
+    logger.info(f"use {model_data['model_name']} features: {model_data['features']}")
     if model_data is not None:
         model_binary = model_data['model']  # 确保这个字段是存储模型的二进制数据
         # 反序列化模型
@@ -105,7 +116,7 @@ def model_prediction(df, args):
         df['model'] = model_name
         df['howLongAgo'] = howLongAgo
         df['farm_id'] = farm_id
-        print("model predict result  successfully!")
+        logger.info("model predict result successfully!")
 
     return df[['dateTime', 'howLongAgo', 'model', 'farm_id', 'power_forecast', target]]
 
@@ -129,12 +140,12 @@ def model_prediction_ml():
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()
-        my_exception.replace("\n", "\t")
-        result['msg'] = my_exception
+        logger.error(my_exception)
     end_time = time.time()
 
     result['success'] = success
     result['args'] = args
+    result['log'] = result['log'] = g.log_stream.getvalue().splitlines()
     result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
     result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
     print("Program execution ends!")

+ 25 - 6
models_processing/model_train/model_training_ml.py

@@ -2,7 +2,7 @@ import lightgbm as lgb
 import numpy as np
 from sklearn.model_selection import train_test_split
 from sklearn.metrics import mean_squared_error, mean_absolute_error
-from flask import Flask, request
+from flask import Flask, request, g
 import time
 import traceback
 import logging
@@ -12,19 +12,37 @@ from sklearn.pipeline import Pipeline
 from sklearn.svm import SVR
 from sklearn.preprocessing import MinMaxScaler
 from data_processing.data_operation.weight import WEIGHT_REGISTER
+from io import StringIO
+from common.log_utils import init_request_logging, teardown_request_logging
 
 app = Flask('model_training_ml——service')
 
+
+# 请求前设置日志捕获
+@app.before_request
+def setup_logging():
+    init_request_logging(logger)
+
+# 请求后清理日志处理器
+@app.after_request
+def teardown_logging(response):
+    return teardown_request_logging(response, logger)
+
 def get_sample_weight(df, label, args):
     # 样本权重
     if 'sample_weight' in args.keys():
         if args['sample_weight'] in WEIGHT_REGISTER.keys():
             sample_weight = WEIGHT_REGISTER[args['sample_weight']](df[label].values.reshape(-1), **args)
+            logger.info(f"use predefined weights {args['sample_weight']}")
         elif args['sample_weight'] in df.columns.tolist():
             sample_weight = df[args['sample_weight']].values.reshape(-1)
+            logger.info(f'use dataframe col {args["sample_weight"]}')
         else:
             sample_weight = None
-            print('sample_weight is neither in the predefined weights nor a column of the DataFrame, not applicable')
+            logger.info('sample_weight is neither in the predefined weights nor a column of the DataFrame, not applicable')
+    else:
+        sample_weight = None
+        logger.info('no sample_weight')
     return sample_weight
 
 def train_lgb(data_split, categorical_features, model_params, num_boost_round=100, sample_weight=None):
@@ -90,10 +108,12 @@ def build_model(df, args):
     model_type = args['model_type']
     # 区分常规机器学习模型和lgb,这里只实例化svr,后续可扩展
     if model_type == "lightgbm":
+        logger.info("lightgbm training")
         num_boost_round = int(args['num_boost_round'])
         model, y_pred = train_lgb([X_train, X_test, y_train, y_test], categorical_features, model_params,
                                   num_boost_round, sample_weight=sample_weight)
     elif model_type == "svr":
+        logger.info("svr training")
         model, y_pred = train_svr([X_train, X_test, y_train, y_test], model_params, sample_weight=sample_weight)
     else:
         raise ValueError(f"Invalid model_type, must be one of [lightgbm, svr]")
@@ -102,7 +122,7 @@ def build_model(df, args):
     mse = mean_squared_error(y_test, y_pred)
     rmse = np.sqrt(mse)
     mae = mean_absolute_error(y_test, y_pred)
-    print(f'The test rmse is: {rmse},"The test mae is:"{mae}')
+    logger.info(f'The test rmse is: {round(rmse, 2)},"The test mae is:"{round(mae, 2)}')
     return model, features
 
 
@@ -115,7 +135,6 @@ def model_training_ml():
     print("Program starts execution!")
     try:
         args = request.values.to_dict()
-        print('args', args)
         logger.info(args)
         power_df = get_data_from_mongo(args)
         model, features = build_model(power_df, args)
@@ -123,12 +142,12 @@ def model_training_ml():
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()
-        my_exception.replace("\n", "\t")
-        result['msg'] = my_exception
+        logger.error(my_exception)
     end_time = time.time()
 
     result['success'] = success
     result['args'] = args
+    result['log'] = g.log_stream.getvalue().splitlines()
     result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
     result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
     print("Program execution ends!")

+ 21 - 10
post_processing/post_process.py

@@ -1,18 +1,29 @@
 import pandas as pd
-from flask import Flask, request, jsonify
+from flask import Flask, request, jsonify, g
 import time
 import logging
 import traceback
-
+from io import StringIO
 from common.database_dml import get_data_from_mongo, insert_data_into_mongo
-
+from common.log_utils import init_request_logging, teardown_request_logging
 app = Flask('post_process——service')
 
+# 请求前设置日志捕获
+@app.before_request
+def setup_logging():
+    init_request_logging(logger)
+
+
+# 请求后清理日志处理器
+@app.after_request
+def teardown_logging(response):
+    return teardown_request_logging(response, logger)
+
 def get_data(args):
     df = get_data_from_mongo(args)
     col_time = args['col_time']
     if not df.empty:
-        print("预测数据加载成功!")
+        logger.info(f"{args['mongodb_read_table']} load success")
         df[col_time] = pd.to_datetime(df[col_time])
         df.set_index(col_time, inplace=True)
         df.sort_index(inplace=True)
@@ -39,7 +50,7 @@ def predict_result_adjustment(df, args):
     df_cp = df.copy()
     df_cp['power_forecast'] = df_cp['power_forecast'].rolling(window=smooth_window, min_periods=1,
                                                               center=True).mean().clip(0, 0.985 * cap)
-    print("smooth processed")
+    logger.info(f"smooth processed windows: {smooth_window}")
 
     # 光伏晚上置零
     if plant_type == 'solar' and 'mongodb_nwp_table' in args.keys():
@@ -52,9 +63,9 @@ def predict_result_adjustment(df, args):
 
         df_cp = df_cp.join(nwp['radiation'])
         df_cp.loc[nwp['radiation'] == 0, 'power_forecast'] = 0
-        df_cp['power_forecast'] = round(df_cp['power_forecast'], 2)
         df_cp.drop(columns=['radiation'], inplace=True)
-        print("solar processed")
+        logger.info("solar processed")
+    df_cp['power_forecast'] = round(df_cp['power_forecast'], 2)
     df_cp.reset_index(inplace=True)
     df_cp[col_time] = df_cp[col_time].dt.strftime('%Y-%m-%d %H:%M:%S')
     return df_cp
@@ -77,11 +88,11 @@ def data_join():
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()
-        my_exception.replace("\n", "\t")
-        result['msg'] = my_exception
+        logger.error(my_exception)
     end_time = time.time()
     result['success'] = success
     result['args'] = args
+    result['log'] = g.log_stream.getvalue().splitlines()
     result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
     result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
     print("Program execution ends!")
@@ -91,7 +102,7 @@ def data_join():
 if __name__ == "__main__":
     print("Program starts execution!")
     logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
-    logger = logging.getLogger("post_processing")
+    logger = logging.getLogger("post_process")
     from waitress import serve
 
     serve(app, host="0.0.0.0", port=10130)

+ 43 - 43
run_all.py

@@ -2,49 +2,49 @@ import subprocess
 import os
 # 定义要启动的应用及其路径和端口
 services = [
-    ("data_processing/data_operation/data_join.py", 10094),
-    ("data_processing/data_operation/mysql_to_mongo.py", 10095),
-    ("data_processing/data_operation/pre_prod_ftp.py", 10118),
-    ("data_processing/processing_limit_power/processing_limit_power_by_agcavc.py", 10086),
-    ("data_processing/processing_limit_power/processing_limit_power_by_machines.py", 10087),
-    ("data_processing/processing_limit_power/processing_limit_power_by_records.py", 10088),
-    ("data_processing/processing_limit_power/processing_limit_power_by_statistics_light.py", 10085),
-    ("data_processing/processing_limit_power/processing_limit_power_by_statistics_wind.py", 10093),
-    ("data_processing/data_operation/pre_prod_ftp.py", '_'),
-    ("evaluation_processing/analysis_report.py", 10092),
-    ("evaluation_processing/evaluation_accuracy.py", 10091),
-    ("evaluation_processing/analysis_cdq.py", 10108),
-    ("models_processing/model_train/model_training_lightgbm.py", 10089),
-    ("models_processing/model_predict/model_prediction_lightgbm.py", 10090),
-    ("models_processing/model_train/model_training_lstm.py", 10096),
-    ("models_processing/model_predict/model_prediction_lstm.py", 10097),
-    ("models_processing/model_tf/tf_bp_pre.py", 10110),
-    ("models_processing/model_tf/tf_bp_train.py", 10111),
-    ("models_processing/model_tf/tf_cnn_pre.py", 10112),
-    ("models_processing/model_tf/tf_cnn_train.py", 10113),
-    ("models_processing/model_tf/tf_lstm_pre.py", 10114),
-    ("models_processing/model_tf/tf_lstm_train.py", 10115),
-    ("models_processing/model_tf/tf_test_pre.py", 10116),
-    ("models_processing/model_tf/tf_test_train.py", 10117),
-    ("models_processing/model_tf/tf_lstm2_pre.py", 10120),
-    ("models_processing/model_tf/tf_lstm2_train.py", 10119),
-    ("models_processing/model_tf/tf_lstm3_pre.py", 10122),
-    ("models_processing/model_tf/tf_lstm3_train.py", 10121),
-    ("models_processing/model_tf/tf_lstm_zone_pre.py", 10125),
-    ("models_processing/model_tf/tf_lstm_zone_train.py", 10124),
-
-    ("post_processing/post_processing.py", 10098),
-    ("evaluation_processing/analysis.py", 10099),
-    ("models_processing/model_predict/res_prediction.py", 10105),
-    ("data_processing/data_operation/pre_data_ftp.py", 10101),
-    ("data_processing/data_operation/data_nwp_ftp.py", 10102),
-    ("models_processing/model_train/model_training_bp.py", 10103),
-    ("models_processing/model_predict/model_prediction_bp.py", 10104),
-    ("data_processing/data_operation/data_tj_nwp_ftp.py", 10106),
-    ("post_processing/pre_post_processing.py", 10107),
-    ("post_processing/cdq_coe_gen.py", 10123),
-    ("models_processing/model_predict/model_prediction_photovoltaic_physical.py", 10126),
-    ("data_processing/data_operation/hive_to_mongo.py", 10127),
+    # ("data_processing/data_operation/data_join.py", 10094),
+    # ("data_processing/data_operation/mysql_to_mongo.py", 10095),
+    # ("data_processing/data_operation/pre_prod_ftp.py", 10118),
+    # ("data_processing/processing_limit_power/processing_limit_power_by_agcavc.py", 10086),
+    # ("data_processing/processing_limit_power/processing_limit_power_by_machines.py", 10087),
+    # ("data_processing/processing_limit_power/processing_limit_power_by_records.py", 10088),
+    # ("data_processing/processing_limit_power/processing_limit_power_by_statistics_light.py", 10085),
+    # ("data_processing/processing_limit_power/processing_limit_power_by_statistics_wind.py", 10093),
+    # ("data_processing/data_operation/pre_prod_ftp.py", '_'),
+    # ("evaluation_processing/analysis_report.py", 10092),
+    # ("evaluation_processing/evaluation_accuracy.py", 10091),
+    # ("evaluation_processing/analysis_cdq.py", 10108),
+    # ("models_processing/model_train/model_training_lightgbm.py", 10089),
+    # ("models_processing/model_predict/model_prediction_lightgbm.py", 10090),
+    # ("models_processing/model_train/model_training_lstm.py", 10096),
+    # ("models_processing/model_predict/model_prediction_lstm.py", 10097),
+    # ("models_processing/model_tf/tf_bp_pre.py", 10110),
+    # ("models_processing/model_tf/tf_bp_train.py", 10111),
+    # ("models_processing/model_tf/tf_cnn_pre.py", 10112),
+    # ("models_processing/model_tf/tf_cnn_train.py", 10113),
+    # ("models_processing/model_tf/tf_lstm_pre.py", 10114),
+    # ("models_processing/model_tf/tf_lstm_train.py", 10115),
+    # ("models_processing/model_tf/tf_test_pre.py", 10116),
+    # ("models_processing/model_tf/tf_test_train.py", 10117),
+    # ("models_processing/model_tf/tf_lstm2_pre.py", 10120),
+    # ("models_processing/model_tf/tf_lstm2_train.py", 10119),
+    # ("models_processing/model_tf/tf_lstm3_pre.py", 10122),
+    # ("models_processing/model_tf/tf_lstm3_train.py", 10121),
+    # ("models_processing/model_tf/tf_lstm_zone_pre.py", 10125),
+    # ("models_processing/model_tf/tf_lstm_zone_train.py", 10124),
+    #
+    # ("post_processing/post_processing.py", 10098),
+    # ("evaluation_processing/analysis.py", 10099),
+    # ("models_processing/model_predict/res_prediction.py", 10105),
+    # ("data_processing/data_operation/pre_data_ftp.py", 10101),
+    # ("data_processing/data_operation/data_nwp_ftp.py", 10102),
+    # ("models_processing/model_train/model_training_bp.py", 10103),
+    # ("models_processing/model_predict/model_prediction_bp.py", 10104),
+    # ("data_processing/data_operation/data_tj_nwp_ftp.py", 10106),
+    # ("post_processing/pre_post_processing.py", 10107),
+    # ("post_processing/cdq_coe_gen.py", 10123),
+    # ("models_processing/model_predict/model_prediction_photovoltaic_physical.py", 10126),
+    # ("data_processing/data_operation/hive_to_mongo.py", 10127),
 
     ("models_processing/model_train/model_training_ml.py", 10128),
     ("models_processing/model_predict/model_prediction_ml.py", 10129),