ソースを参照

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei 1 ヶ月 前
コミット
0f5d487e38

+ 3 - 3
common/database_dml_koi.py

@@ -468,7 +468,7 @@ def get_keras_model_from_mongo(
         model_doc = collection.find_one(
             {"model_name": args['model_name']},
             sort=[('gen_time', DESCENDING)],
-            projection={"model_data": 1, "gen_time": 1}
+            projection={"model_data": 1, "gen_time": 1, 'params':1}
         )
 
         if not model_doc:
@@ -477,7 +477,7 @@ def get_keras_model_from_mongo(
 
         # ------------------------- 内存优化加载 -------------------------
         model_data = model_doc['model_data']
-
+        model_params = model_doc['params']
         # 创建临时文件(自动删除)
         with tempfile.NamedTemporaryFile(suffix=".keras", delete=False) as tmp_file:
             tmp_file.write(model_data)
@@ -490,7 +490,7 @@ def get_keras_model_from_mongo(
         )
 
         print(f"{args['model_name']} 模型成功从 MongoDB 加载!")
-        return model
+        return model, model_params
 
     except tf.errors.NotFoundError as e:
         print(f"❌ 模型结构缺失关键组件: {str(e)}")

+ 13 - 4
data_processing/data_operation/data_handler.py

@@ -102,6 +102,12 @@ class DataHandler(object):
             data_train = self.data_fill(data_train, col_time)
         return data_train
 
+    def fill_pre_data(self, unite):
+        unite = unite.interpolate(method='linear')  # nwp先进行线性填充
+        unite = unite.fillna(method='ffill')  # 再对超过采样边缘无法填充的点进行二次填充
+        unite = unite.fillna(method='bfill')
+        return unite
+
     def missing_time_splite(self, df, dt_short, dt_long, col_time):
         df.reset_index(drop=True, inplace=True)
         n_long, n_short, n_points = 0, 0, 0
@@ -183,15 +189,16 @@ class DataHandler(object):
         # 对清洗完限电的数据进行特征预处理:
         # 1.空值异常值清洗
         train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
+        self.opt.features = [x for x in train_data_cleaned.columns.tolist() if x not in [target, col_time] and x in features]
         # 2. 标准化
         # 创建特征和目标的标准化器
         train_scaler = MinMaxScaler(feature_range=(0, 1))
         target_scaler = MinMaxScaler(feature_range=(0, 1))
         # 标准化特征和目标
-        scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features])
+        scaled_train_data = train_scaler.fit_transform(train_data_cleaned[self.opt.features])
         scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
         scaled_cap = target_scaler.transform(np.array([[self.opt.cap]]))[0,0]
-        train_data_cleaned[features] = scaled_train_data
+        train_data_cleaned[self.opt.features] = scaled_train_data
         train_data_cleaned[[target]] = scaled_target
         # 3.缺值补值
         train_datas = self.fill_train_data(train_data_cleaned, col_time)
@@ -205,10 +212,10 @@ class DataHandler(object):
 
         if bp_data:
             train_data = pd.concat(train_datas, axis=0)
-            train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[self.opt.features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
             train_x, valid_x, train_y, valid_y =  np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
         else:
-            train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
+            train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, self.opt.features, target)
         return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
 
     def pre_data_handler(self, data, feature_scaler, bp_data=False):
@@ -228,6 +235,8 @@ class DataHandler(object):
         col_time, features = self.opt.col_time, self.opt.features
         data = data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
         data = data.sort_values(by=col_time).reset_index(drop=True, inplace=False)
+        if self.opt.Model['predict_data_fill']:
+            data = self.fill_pre_data(data)
         pre_data = data[features]
         scaled_features = feature_scaler.transform(data[features])
         pre_data.loc[:, features] = scaled_features

+ 10 - 12
models_processing/model_tf/tf_bp.py

@@ -24,6 +24,7 @@ class BPHandler(object):
         self.logger = logger
         self.opt = argparse.Namespace(**args)
         self.model = None
+        self.model_params = None
 
     def get_model(self, args):
         """
@@ -32,7 +33,7 @@ class BPHandler(object):
         try:
             with model_lock:
                 # loss = region_loss(self.opt)
-                self.model = get_keras_model_from_mongo(args)
+                self.model, self.model_params = get_keras_model_from_mongo(args)
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
@@ -86,20 +87,17 @@ class BPHandler(object):
 
     def train_init(self):
         try:
-            if self.opt.Model['add_train']:
-                # 进行加强训练,支持修模
-                loss = region_loss(self.opt)
-                base_train_model = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
-                base_train_model.summary()
-                self.logger.info("已加载加强训练基础模型")
-            else:
-                base_train_model = self.get_keras_model(self.opt)
+            # 进行加强训练,支持修模
+            loss = region_loss(self.opt)
+            base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
+            base_train_model.summary()
+            self.logger.info("已加载加强训练基础模型")
             return base_train_model
         except Exception as e:
-            self.logger.info("加载模型权重失败:{}".format(e.args))
+            self.logger.info("加载加强训练模型权重失败:{}".format(e.args))
+            return False
 
-    def training(self, train_and_valid_data):
-        model = self.train_init()
+    def training(self, model, train_and_valid_data):
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         print("----------", np.array(train_x[0]).shape)
         print("++++++++++", np.array(train_x[1]).shape)

+ 3 - 1
models_processing/model_tf/tf_bp_pre.py

@@ -53,10 +53,12 @@ def model_prediction_bp():
         # ------------ 获取数据,预处理预测数据------------
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
-        scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler, bp_data=True)
         bp.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0, 0], 2)
         # ------------ 获取模型,预测结果------------
         bp.get_model(args)
+        dh.opt.features = json.loads(bp.model_params).get('features', args['features'])
+        scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler, bp_data=True)
+
         res = list(chain.from_iterable(target_scaler.inverse_transform(bp.predict(scaled_pre_x))))
         pre_data['farm_id'] = args.get('farm_id', 'null')
         if args.get('algorithm_test', 0):

+ 15 - 3
models_processing/model_tf/tf_bp_train.py

@@ -49,10 +49,22 @@ def model_training_bp():
         # ------------ 获取数据,预处理训练数据 ------------
         train_data = get_data_from_mongo(args)
         train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data, bp_data=True)
-        # ------------ 训练模型 ------------
-        bp.opt.Model['input_size'] = train_x.shape[1]
         bp.opt.cap = round(scaled_cap, 2)
-        bp_model = bp.training([train_x, train_y, valid_x, valid_y])
+        bp.opt.Model['input_size'] = len(dh.opt.features)
+        # ------------ 训练模型 ------------
+        # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
+        # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
+        model = bp.train_init() if bp.opt.Model['add_train'] else bp.get_keras_model(bp.opt)
+        if bp.opt.Model['add_train'] and model is not False:
+            feas = json.loads(bp.model_params).get('features', args['features'])
+            if set(feas).issubset(set(dh.opt.features)):
+                dh.opt.features = list(feas)
+                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+            else:
+                model = bp.get_keras_model(bp.opt)
+                logger.info("训练数据特征,不满足,加强训练模型特征")
+
+        bp_model = bp.training(model, [train_x, train_y, valid_x, valid_y])
         # ------------ 保存模型 ------------
         args['params'] = json.dumps(args)
         args['descr'] = '测试'

+ 9 - 11
models_processing/model_tf/tf_cnn.py

@@ -24,6 +24,7 @@ class CNNHandler(object):
         self.logger = logger
         self.opt = argparse.Namespace(**args)
         self.model = None
+        self.model_params = None
 
     def get_model(self, args):
         """
@@ -32,7 +33,7 @@ class CNNHandler(object):
         try:
             with model_lock:
                 loss = region_loss(self.opt)
-                self.model = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
+                self.model, self.model_params = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
@@ -58,20 +59,17 @@ class CNNHandler(object):
 
     def train_init(self):
         try:
-            if self.opt.Model['add_train']:
-                # 进行加强训练,支持修模
-                loss = region_loss(self.opt)
-                base_train_model = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
-                base_train_model.summary()
-                self.logger.info("已加载加强训练基础模型")
-            else:
-                base_train_model = self.get_keras_model(self.opt)
+            # 进行加强训练,支持修模
+            loss = region_loss(self.opt)
+            base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
+            base_train_model.summary()
+            self.logger.info("已加载加强训练基础模型")
             return base_train_model
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
+            return False
 
-    def training(self, train_and_valid_data):
-        model = self.train_init()
+    def training(self, model, train_and_valid_data):
         # tf.reset_default_graph() # 清除默认图
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         print("----------", np.array(train_x[0]).shape)

+ 5 - 2
models_processing/model_tf/tf_cnn_pre.py

@@ -53,10 +53,13 @@ def model_prediction_bp():
     try:
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
+        cnn.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0, 0], 2)
+
+        cnn.get_model(args)
+        dh.opt.features = json.loads(cnn.model_params).get('features', args['features'])
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
-        cnn.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0,0], 2)
         logger.info("---------cap归一化:{}".format(cnn.opt.cap))
-        cnn.get_model(args)
+
         res = list(chain.from_iterable(target_scaler.inverse_transform(cnn.predict(scaled_pre_x))))
         pre_data['farm_id'] = args.get('farm_id', 'null')
         if args.get('algorithm_test', 0):

+ 14 - 3
models_processing/model_tf/tf_cnn_train.py

@@ -50,12 +50,23 @@ def model_training_bp():
         # ------------ 获取数据,预处理训练数据 ------------
         train_data = get_data_from_mongo(args)
         train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
-        # ------------ 训练模型,保存模型 ------------
-        cnn.opt.Model['input_size'] = train_x.shape[2]
+        cnn.opt.Model['input_size'] = len(dh.opt.features)
         cnn.opt.cap = round(scaled_cap, 2)
+        # ------------ 训练模型,保存模型 ------------
+        # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
+        # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
         logger.info("---------cap归一化:{}".format(cnn.opt.cap))
+        model = cnn.train_init() if cnn.opt.Model['add_train'] else cnn.get_keras_model(cnn.opt)
+        if cnn.opt.Model['add_train'] and model is not False:
+            feas = json.loads(cnn.model_params).get('features', args['features'])
+            if set(feas).issubset(set(dh.opt.features)):
+                dh.opt.features = list(feas)
+                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+            else:
+                model = cnn.get_keras_model(cnn.opt)
+                logger.info("训练数据特征,不满足,加强训练模型特征")
 
-        bp_model = cnn.training([train_x, train_y, valid_x, valid_y])
+        bp_model = cnn.training(model, [train_x, train_y, valid_x, valid_y])
 
         args['params'] = json.dumps(args)
         args['descr'] = '测试'

+ 10 - 12
models_processing/model_tf/tf_lstm.py

@@ -23,6 +23,7 @@ class TSHandler(object):
         self.logger = logger
         self.opt = argparse.Namespace(**args)
         self.model = None
+        self.model_params = None
 
     def get_model(self, args):
         """
@@ -31,7 +32,7 @@ class TSHandler(object):
         try:
             with model_lock:
                 loss = region_loss(self.opt)
-                self.model = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
+                self.model, self.model_params = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
@@ -55,20 +56,17 @@ class TSHandler(object):
 
     def train_init(self):
         try:
-            if self.opt.Model['add_train']:
-                # 进行加强训练,支持修模
-                loss = region_loss(self.opt)
-                base_train_model = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
-                base_train_model.summary()
-                self.logger.info("已加载加强训练基础模型")
-            else:
-                base_train_model = self.get_keras_model(self.opt)
+            # 进行加强训练,支持修模
+            loss = region_loss(self.opt)
+            base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
+            base_train_model.summary()
+            self.logger.info("已加载加强训练基础模型")
             return base_train_model
         except Exception as e:
-            self.logger.info("加载模型权重失败:{}".format(e.args))
+            self.logger.info("加载加强训练模型权重失败:{}".format(e.args))
+            return False
 
-    def training(self, train_and_valid_data):
-        model = self.train_init()
+    def training(self, model, train_and_valid_data):
         model.summary()
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto')

+ 2 - 1
models_processing/model_tf/tf_lstm_pre.py

@@ -53,9 +53,10 @@ def model_prediction_bp():
     try:
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
-        scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
         ts.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0, 0], 2)
         ts.get_model(args)
+        dh.opt.features = json.loads(ts.model_params).get('features', args['features'])
+        scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
         res = list(chain.from_iterable(target_scaler.inverse_transform(ts.predict(scaled_pre_x))))
         pre_data['farm_id'] = args.get('farm_id', 'null')
         if args.get('algorithm_test', 0):

+ 15 - 4
models_processing/model_tf/tf_lstm_train.py

@@ -48,11 +48,22 @@ def model_training_bp():
         # ------------ 获取数据,预处理训练数据 ------------
         train_data = get_data_from_mongo(args)
         train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
-        # ------------ 训练模型,保存模型 ------------
-        ts.opt.Model['input_size'] = train_x.shape[2]
         ts.opt.cap = round(scaled_cap, 2)
-        ts_model = ts.training([train_x, train_y, valid_x, valid_y])
-
+        ts.opt.Model['input_size'] = len(dh.opt.features)
+        # ------------ 训练模型,保存模型 ------------
+        # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
+        # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
+        model = ts.train_init() if ts.opt.Model['add_train'] else ts.get_keras_model(ts.opt)
+        if ts.opt.Model['add_train'] and model is not False:
+            feas = json.loads(ts.model_params).get('features', args['features'])
+            if set(feas).issubset(set(dh.opt.features)):
+                dh.opt.features = list(feas)
+                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+            else:
+                model = ts.get_keras_model(ts.opt)
+                logger.info("训练数据特征,不满足,加强训练模型特征")
+        ts_model = ts.training(model, [train_x, train_y, valid_x, valid_y])
+        args['features'] = dh.opt.features
         args['params'] = json.dumps(args)
         args['descr'] = '测试'
         args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

+ 8 - 11
models_processing/model_tf/tf_test.py

@@ -24,6 +24,7 @@ class TSHandler(object):
         self.logger = logger
         self.opt = argparse.Namespace(**args)
         self.model = None
+        self.model_params = None
 
     def get_model(self, args):
         """
@@ -32,7 +33,7 @@ class TSHandler(object):
         try:
             with model_lock:
                 loss = region_loss(self.opt)
-                self.model = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
+                self.model, self.model_params = get_keras_model_from_mongo(args, {type(loss).__name__: loss})
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
@@ -120,20 +121,16 @@ class TSHandler(object):
 
     def train_init(self):
         try:
-            if self.opt.Model['add_train']:
-                # 进行加强训练,支持修模
-                loss = region_loss(self.opt)
-                base_train_model = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
-                base_train_model.summary()
-                self.logger.info("已加载加强训练基础模型")
-            else:
-                base_train_model = self.get_keras_model(self.opt)
+            # 进行加强训练,支持修模
+            loss = region_loss(self.opt)
+            base_train_model, self.model_params = get_keras_model_from_mongo(vars(self.opt), {type(loss).__name__: loss})
+            base_train_model.summary()
+            self.logger.info("已加载加强训练基础模型")
             return base_train_model
         except Exception as e:
             self.logger.info("加载模型权重失败:{}".format(e.args))
 
-    def training(self, train_and_valid_data):
-        model = self.train_init()
+    def training(self, model, train_and_valid_data):
         model.summary()
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         # 回调函数配置

+ 4 - 1
models_processing/model_tf/tf_test_pre.py

@@ -53,9 +53,12 @@ def model_prediction_test():
     try:
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
-        scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
         ts.opt.cap = round(target_scaler.transform(np.array([[args['cap']]]))[0, 0], 2)
+
         ts.get_model(args)
+        dh.opt.features = json.loads(ts.model_params).get('features', args['features'])
+        scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
+
         res = list(chain.from_iterable(target_scaler.inverse_transform(ts.predict(scaled_pre_x))))
         pre_data['farm_id'] = args.get('farm_id', 'null')
         if args.get('algorithm_test', 0):

+ 15 - 3
models_processing/model_tf/tf_test_train.py

@@ -48,11 +48,23 @@ def model_training_test():
         # ------------ 获取数据,预处理训练数据 ------------
         train_data = get_data_from_mongo(args)
         train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
-        # ------------ 训练模型,保存模型 ------------
-        ts.opt.Model['input_size'] = train_x.shape[2]
         ts.opt.cap = round(scaled_cap, 2)
-        ts_model = ts.training([train_x, train_y, valid_x, valid_y])
+        ts.opt.Model['input_size'] = len(dh.opt.features)
+        # ------------ 训练模型,保存模型 ------------
+        # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
+        # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
+        model = ts.train_init() if ts.opt.Model['add_train'] else ts.get_keras_model(ts.opt)
+        if ts.opt.Model['add_train'] and model is not False:
+            feas = json.loads(ts.model_params).get('features', args['features'])
+            if set(feas).issubset(set(dh.opt.features)):
+                dh.opt.features = list(feas)
+                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(
+                    train_data)
+            else:
+                model = ts.get_keras_model(ts.opt)
+                logger.info("训练数据特征,不满足,加强训练模型特征")
 
+        ts_model = ts.training(model, [train_x, train_y, valid_x, valid_y])
         args['params'] = json.dumps(args)
         args['descr'] = '测试'
         args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))