David преди 2 месеца
родител
ревизия
0d69fd8ef3
променени са 3 файла, в които са добавени 51 реда и са изтрити 55 реда
  1. 6 4
      data_processing/data_operation/data_handler.py
  2. 6 15
      models_processing/model_koi/tf_bp.py
  3. 39 36
      models_processing/model_koi/tf_bp_train.py

+ 6 - 4
data_processing/data_operation/data_handler.py

@@ -5,6 +5,8 @@
 # @Author    :David
 # @Company: shenyang JY
 import argparse, numbers, joblib
+
+import numpy as np
 import pandas as pd
 from io import BytesIO
 from bson.decimal128 import Decimal128
@@ -181,8 +183,7 @@ class DataHandler(object):
         # train_data = train_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
         # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
         train_data_cleaned = key_field_row_cleaning(train_data, features + [target], self.logger)
-        train_data_cleaned = train_data_cleaned.applymap(
-            lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        train_data_cleaned = train_data_cleaned.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
         # 创建特征和目标的标准化器
         train_scaler = MinMaxScaler(feature_range=(0, 1))
         target_scaler = MinMaxScaler(feature_range=(0, 1))
@@ -204,10 +205,11 @@ class DataHandler(object):
 
         if bp_data:
             train_data = pd.concat(train_datas, axis=0)
-            train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data, scaled_target, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x, valid_x, train_y, valid_y =  np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
         else:
             train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
-        return train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes
+        return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes
 
     def pre_data_handler(self, data, feature_scaler, opt, bp_data=False):
         """

+ 6 - 15
models_processing/model_koi/tf_bp.py

@@ -4,7 +4,7 @@
 # @Time      :2025/2/13 13:34
 # @Author    :David
 # @Company: shenyang JY
-
+from tensorflow.keras.models import Sequential
 from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
 from tensorflow.keras.models import Model, load_model
 from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
@@ -33,21 +33,12 @@ class BPHandler(object):
 
     @staticmethod
     def get_keras_model(opt):
-        # db_loss = NorthEastLoss(opt)
-        # south_loss = SouthLoss(opt)
-        l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
-        l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
-        nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
-
-        con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
-        d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1)
-        nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1)
-
-        output = Dense(1, name='d5')(nwp)
-        output_f = Flatten()(output)
-        model = Model(nwp_input, output_f)
+        model = Sequential([
+            Dense(64, input_dim=opt.Model['input_size'], activation='relu'),  # 输入层和隐藏层,10个神经元
+            Dense(32, activation='relu'),  # 隐藏层,8个神经元
+            Dense(1, activation='linear')  # 输出层,1个神经元(用于回归任务)
+        ])
         adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
-        reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
         model.compile(loss=rmse, optimizer=adam)
         return model
 

+ 39 - 36
models_processing/model_koi/tf_bp_train.py

@@ -37,23 +37,26 @@ def model_training_bp():
     success = 0
     print("Program starts execution!")
     args_dict = request.values.to_dict()
-    args = arguments.deepcopy()
+    args_dict['features'] = args_dict['features'].split(',')
+    args = copy.deepcopy(arguments)
     args.update(args_dict)
-    try:
-        opt = argparse.Namespace(**args)
-        logger.info(args_dict)
-        train_data = get_data_from_mongo(args_dict)
-        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt, bp_data=True)
-        bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y])
-        args_dict['params'] = json.dumps(args)
-        args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-        insert_trained_model_into_mongo(bp_model, args_dict)
-        insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args)
-        success = 1
-    except Exception as e:
-        my_exception = traceback.format_exc()
-        my_exception.replace("\n", "\t")
-        result['msg'] = my_exception
+    # try:
+    opt = argparse.Namespace(**args)
+    logger.info(args_dict)
+    train_data = get_data_from_mongo(args_dict)
+    train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt, bp_data=True)
+    opt.Model['input_size'] = train_x.shape[1]
+    bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
+    args_dict['params'] = json.dumps(args)
+    args_dict['descr'] = '测试'
+    args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    insert_trained_model_into_mongo(bp_model, args_dict)
+    insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args)
+    success = 1
+    # except Exception as e:
+    #     my_exception = traceback.format_exc()
+    #     my_exception.replace("\n", "\t")
+    #     result['msg'] = my_exception
     end_time = time.time()
 
     result['success'] = success
@@ -70,23 +73,23 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=10103, threads=4)
-    print("server start!")
-    args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
-    'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
-    'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    bp = BPHandler(logger)
-    opt = argparse.Namespace(**arguments)
-    opt.Model['input_size'] = len(opt.features)
-    train_data = get_data_from_mongo(args_dict)
-    train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt, bp_data=True)
-    bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
-
-    args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-    args_dict['params'] = arguments
-    args_dict['descr'] = '测试'
-    insert_trained_model_into_mongo(bp_model, args_dict)
-    insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)
+    serve(app, host="0.0.0.0", port=10103, threads=4)
+    # print("server start!")
+    # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+    # 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
+    # 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # bp = BPHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    # opt.Model['input_size'] = len(opt.features)
+    # train_data = get_data_from_mongo(args_dict)
+    # train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt, bp_data=True)
+    # bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
+    #
+    # args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    # args_dict['params'] = arguments
+    # args_dict['descr'] = '测试'
+    # insert_trained_model_into_mongo(bp_model, args_dict)
+    # insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)