David пре 2 месеци
родитељ
комит
310bf6a98e

+ 5 - 6
common/database_dml.py

@@ -221,7 +221,7 @@ def insert_scaler_model_into_mongo(feature_scaler_bytes, scaled_target_bytes, ar
     print("scaler_model inserted successfully!")
 
 
-def get_h5_model_from_mongo(args):
+def get_h5_model_from_mongo(args, custom=None):
     mongodb_connection,mongodb_database,model_table,model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['model_table'],args['model_name']
     client = MongoClient(mongodb_connection)
     # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
@@ -237,7 +237,7 @@ def get_h5_model_from_mongo(args):
         # 从缓冲区加载模型
          # 使用 h5py 和 BytesIO 从内存中加载模型
         with h5py.File(model_buffer, 'r') as f:
-            model = tf.keras.models.load_model(f)
+            model = tf.keras.models.load_model(f, custom_objects=custom)
         print(f"{model_name}模型成功从 MongoDB 加载!")
         client.close()
         return model
@@ -247,9 +247,8 @@ def get_h5_model_from_mongo(args):
         return None
 
 
-def get_scaler_model_from_mongo(args, feature_scaler=False):
-    mongodb_connection, mongodb_database, scaler_table, = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
-                                                           args['mongodb_database'], args['scaler_table'])
+def get_scaler_model_from_mongo(args, only_feature_scaler=False):
+    mongodb_connection, mongodb_database, scaler_table, = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", args['mongodb_database'], args['scaler_table'])
     client = MongoClient(mongodb_connection)
     # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
     db = client[mongodb_database]
@@ -260,7 +259,7 @@ def get_scaler_model_from_mongo(args, feature_scaler=False):
 
     feature_scaler_bytes = BytesIO(scaler_doc["feature_scaler"])
     feature_scaler = joblib.load(feature_scaler_bytes)
-    if feature_scaler:
+    if only_feature_scaler:
         return feature_scaler
     target_scaler_bytes = BytesIO(scaler_doc["target_scaler"])
     target_scaler = joblib.load(target_scaler_bytes)

+ 12 - 13
data_processing/data_operation/data_handler.py

@@ -37,8 +37,6 @@ class DataHandler(object):
 
         train_x = np.array([x.values for x in train_x])
         valid_x = np.array([x.values for x in valid_x])
-        # train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
-        # valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
 
         return train_x, valid_x, train_y, valid_y
 
@@ -50,8 +48,7 @@ class DataHandler(object):
                 continue
             datax = self.get_predict_features(df, features)
             test_x.extend(datax)
-
-        test_x = [np.array([x[0].values for x in test_x]), np.array([x[1].values for x in test_x])]
+        test_x = np.array(test_x)
         return test_x
 
     def get_predict_features(self, norm_data, features):
@@ -61,7 +58,7 @@ class DataHandler(object):
         time_step = self.opt.Model["time_step"]
         feature_data = norm_data.reset_index(drop=True)
         time_step_loc = time_step - 1
-        iters = int(len(feature_data)) / self.opt.Model['time_step']
+        iters = int(len(feature_data)) // self.opt.Model['time_step']
         features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, features].reset_index(drop=True) for i in range(iters)])
         return features_x
 
@@ -186,7 +183,7 @@ class DataHandler(object):
         target_scaler = MinMaxScaler(feature_range=(0, 1))
         # 标准化特征和目标
         scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features])
-        scaled_target = target_scaler.fit_transform(train_data[[target]])
+        scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
         train_data_cleaned[features] = scaled_train_data
         train_data_cleaned[[target]] = scaled_target
         train_datas = self.fill_train_data(train_data_cleaned, col_time)
@@ -194,15 +191,15 @@ class DataHandler(object):
         scaled_train_bytes = BytesIO()
         scaled_target_bytes = BytesIO()
 
-        joblib.dump(scaled_train_data, scaled_train_bytes)
-        joblib.dump(scaled_target, scaled_target_bytes)
+        joblib.dump(train_scaler, scaled_train_bytes)
+        joblib.dump(target_scaler, scaled_target_bytes)
         scaled_train_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
         scaled_target_bytes.seek(0)
 
         train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
         return train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes
 
-    def pre_data_handler(self, data, feature_scaler, args):
+    def pre_data_handler(self, data, feature_scaler, opt):
         """
         预测数据简单处理
         Args:
@@ -213,9 +210,11 @@ class DataHandler(object):
         """
         if 'is_limit' in data.columns:
             data = data[data['is_limit'] == False]
-        features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
-            args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
-        pre_data = data.sort_values(by=col_time)
+        # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
+        #     args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
+        col_time, features = opt.col_time, opt.features
+        pre_data = data.sort_values(by=col_time)[features]
         scaled_features = feature_scaler.transform(pre_data[features])
-        pre_x = self.get_predict_data([scaled_features], features)
+        pre_data[features] = scaled_features
+        pre_x = self.get_predict_data([pre_data], features)
         return pre_x

+ 3 - 3
models_processing/model_koi/tf_bp.py

@@ -10,6 +10,7 @@ from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Con
 from tensorflow.keras.models import Model, load_model
 from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
 from tensorflow.keras import optimizers, regularizers
+from models_processing.losses.loss_cdq import rmse
 import numpy as np
 from common.database_dml import *
 from threading import Lock
@@ -27,7 +28,7 @@ class BPHandler(object):
         try:
             with model_lock:
                 # NPHandler.model = NPHandler.get_keras_model(opt)
-                self.model = get_h5_model_from_mongo(args)
+                self.model = get_h5_model_from_mongo(args, {'rmse': rmse})
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
@@ -35,7 +36,6 @@ class BPHandler(object):
     def get_keras_model(opt):
         # db_loss = NorthEastLoss(opt)
         # south_loss = SouthLoss(opt)
-        from models_processing.losses.loss_cdq import rmse
         l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
         l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
         nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
@@ -56,7 +56,7 @@ class BPHandler(object):
         try:
             if opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(opt))
+                base_train_model = get_h5_model_from_mongo(vars(opt), {'rmse': rmse})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:

+ 27 - 1
models_processing/model_koi/tf_bp_pre.py

@@ -68,4 +68,30 @@ def model_prediction_bp():
 
 
 if __name__ == "__main__":
-    run_code = 0
+    print("Program starts execution!")
+    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
+    logger = logging.getLogger("model_training_bp log")
+    from waitress import serve
+
+    # serve(app, host="0.0.0.0", port=1010x, threads=4)
+    print("server start!")
+    args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+                 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time',
+                 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    args_dict['features'] = args_dict['features'].split(',')
+    arguments.update(args_dict)
+    dh = DataHandler(logger, arguments)
+    bp = BPHandler(logger)
+    opt = argparse.Namespace(**arguments)
+
+    opt.Model['input_size'] = len(opt.features)
+    pre_data = get_data_from_mongo(args_dict)
+    feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
+    pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt)
+    bp.get_model(arguments)
+    result = bp.predict(pre_x)
+
+    args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    args_dict['params'] = arguments
+    args_dict['descr'] = '测试'
+    insert_data_into_mongo(result, arguments)

+ 2 - 3
models_processing/model_koi/tf_bp_train.py

@@ -81,12 +81,11 @@ if __name__ == "__main__":
     opt = argparse.Namespace(**arguments)
     opt.Model['input_size'] = len(opt.features)
     train_data = get_data_from_mongo(args_dict)
-    train_x, valid_x, train_y, valid_y, scaled_train_bytes = dh.train_data_handler(train_data, opt)
-
+    train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
     bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
 
     args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
     args_dict['params'] = arguments
     args_dict['descr'] = '测试'
     insert_trained_model_into_mongo(bp_model, args_dict)
-    insert_scaler_model_into_mongo(scaled_train_bytes, args_dict)
+    insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)

+ 10 - 0
models_processing/model_koi/tf_lstm.py

@@ -0,0 +1,10 @@
+#!/usr/bin/env python
+# -*- coding:utf-8 -*-
+# @FileName  :tf_lstm.py
+# @Time      :2025/2/12 14:03
+# @Author    :David
+# @Company: shenyang JY
+
+
+if __name__ == "__main__":
+    run_code = 0