Browse Source

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei 1 month ago
parent
commit
1b20009463

+ 10 - 10
data_processing/data_operation/data_handler.py

@@ -18,12 +18,12 @@ class DataHandler(object):
         self.logger = logger
         self.opt = argparse.Namespace(**args)
 
-    def get_train_data(self, dfs, col_time, features, target):
+    def get_train_data(self, dfs, col_time, target):
         train_x, valid_x, train_y, valid_y = [], [], [], []
         for i, df in enumerate(dfs, start=1):
             if len(df) < self.opt.Model["time_step"]:
                 self.logger.info("特征处理-训练数据-不满足time_step")
-            datax, datay = self.get_timestep_features(df, col_time, features, target, is_train=True)
+            datax, datay = self.get_timestep_features(df, col_time, target, is_train=True)
             if len(datax) < 10:
                 self.logger.info("特征处理-训练数据-无法进行最小分割")
                 continue
@@ -41,18 +41,18 @@ class DataHandler(object):
 
         return train_x, valid_x, train_y, valid_y
 
-    def get_predict_data(self, dfs, features):
+    def get_predict_data(self, dfs):
         test_x = []
         for i, df in enumerate(dfs, start=1):
             if len(df) < self.opt.Model["time_step"]:
                 self.logger.info("特征处理-预测数据-不满足time_step")
                 continue
-            datax = self.get_predict_features(df, features)
+            datax = self.get_predict_features(df)
             test_x.append(datax)
         test_x = np.concatenate(test_x, axis=0)
         return test_x
 
-    def get_predict_features(self, norm_data, features):
+    def get_predict_features(self, norm_data):
         """
         均分数据,获取预测数据集
         """
@@ -61,14 +61,14 @@ class DataHandler(object):
         time_step_loc = time_step - 1
         iters = int(len(feature_data)) // self.opt.Model['time_step']
         end = int(len(feature_data)) % self.opt.Model['time_step']
-        features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, features].reset_index(drop=True) for i in range(iters)])
+        features_x = np.array([feature_data.loc[i*time_step:i*time_step + time_step_loc, self.opt.features].reset_index(drop=True) for i in range(iters)])
         if end > 0:
             df = feature_data.tail(end)
             df_repeated = pd.concat([df] + [pd.DataFrame([df.iloc[0]]* (time_step-end))]).reset_index(drop=True)
             features_x = np.concatenate((features_x, np.expand_dims(df_repeated, 0)), axis=0)
         return features_x
 
-    def get_timestep_features(self, norm_data, col_time, features, target, is_train):
+    def get_timestep_features(self, norm_data, col_time, target, is_train):
         """
         步长分割数据,获取时序训练集
         """
@@ -77,7 +77,7 @@ class DataHandler(object):
         time_step_loc = time_step - 1
         train_num = int(len(feature_data))
         label_features = [col_time, target] if is_train is True else [col_time, target]
-        nwp_cs = features
+        nwp_cs = self.opt.features
         nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)]  # 数据库字段 'C_T': 'C_WS170'
         labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
         features_x, features_y = [], []
@@ -214,7 +214,7 @@ class DataHandler(object):
             train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[self.opt.features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
             train_x, valid_x, train_y, valid_y =  np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
         else:
-            train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, self.opt.features, target)
+            train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, target)
         return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap
 
     def pre_data_handler(self, data, feature_scaler, bp_data=False):
@@ -241,5 +241,5 @@ class DataHandler(object):
         if bp_data:
             pre_x = np.array(pre_data)
         else:
-            pre_x = self.get_predict_data([pre_data], features)
+            pre_x = self.get_predict_data([pre_data])
         return pre_x, data

+ 1 - 1
models_processing/model_tf/bp.yaml

@@ -1,5 +1,5 @@
 Model:
-  add_train: false
+  add_train: true
   batch_size: 64
   dropout_rate: 0.2
   epoch: 200

+ 1 - 1
models_processing/model_tf/cnn.yaml

@@ -1,5 +1,5 @@
 Model:
-  add_train: false
+  add_train: true
   batch_size: 64
   dropout_rate: 0.2
   epoch: 200

+ 1 - 1
models_processing/model_tf/lstm.yaml

@@ -1,5 +1,5 @@
 Model:
-  add_train: false
+  add_train: true
   batch_size: 64
   dropout_rate: 0.2
   epoch: 200

+ 0 - 27
models_processing/model_tf/test.py

@@ -1,27 +0,0 @@
-#!/usr/bin/env python
-# -*- coding:utf-8 -*-
-# @FileName  :test.py
-# @Time      :2025/3/5 18:11
-# @Author    :David
-# @Company: shenyang JY
-import numpy as np
-# import tensorflow as tf
-# print("TensorFlow 版本:", tf.__version__)
-# print("GPU 是否可用:", tf.config.list_physical_devices('GPU'))
-
-# 生成三个形状为 (2, 3, 4) 的三维数组
-
-
-# arrays = [np.random.rand(2, 3, 4) for _ in range(3)]
-#
-# # 沿新维度 axis=0 堆叠
-# stacked = np.stack(arrays, axis=2)
-#
-# print("堆叠后的形状:", stacked.shape)
-
-arrays = [np.random.rand(3, 3) for _ in range(2)]
-
-# 沿新维度 axis=0 堆叠
-stacked = np.stack(arrays, axis=0)
-
-print("堆叠后的形状:", stacked.shape)

+ 1 - 1
models_processing/model_tf/test.yaml

@@ -1,5 +1,5 @@
 Model:
-  add_train: false
+  add_train: true
   batch_size: 64
   dropout_rate: 0.2
   epoch: 200

+ 1 - 1
models_processing/model_tf/tf_bp_pre.py

@@ -60,7 +60,7 @@ def model_prediction_bp():
         bp.opt.cap = round(target_scaler.transform(np.array([[float(args['cap'])]]))[0, 0], 2)
         # ------------ 获取模型,预测结果------------
         bp.get_model(args)
-        dh.opt.features = json.loads(bp.model_params).get('features', bp.opt.features)
+        dh.opt.features = json.loads(bp.model_params).get('Model').get('features', ','.join(bp.opt.features)).split(',')
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler, bp_data=True)
 
         res = list(chain.from_iterable(target_scaler.inverse_transform(bp.predict(scaled_pre_x))))

+ 10 - 7
models_processing/model_tf/tf_bp_train.py

@@ -55,17 +55,20 @@ def model_training_bp():
         # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
         # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
         model = bp.train_init() if bp.opt.Model['add_train'] else bp.get_keras_model(bp.opt)
-        if bp.opt.Model['add_train'] and model is not False:
-            feas = json.loads(bp.model_params).get('features', args['features'])
-            if set(feas).issubset(set(dh.opt.features)):
-                dh.opt.features = list(feas)
-                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+        if bp.opt.Model['add_train']:
+            if model:
+                feas = json.loads(bp.model_params).get('features', args['features'])
+                if set(feas).issubset(set(dh.opt.features)):
+                    dh.opt.features = list(feas)
+                    train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+                else:
+                    model = bp.get_keras_model(bp.opt)
+                    logger.info("训练数据特征,不满足,加强训练模型特征")
             else:
                 model = bp.get_keras_model(bp.opt)
-                logger.info("训练数据特征,不满足,加强训练模型特征")
-
         bp_model = bp.training(model, [train_x, train_y, valid_x, valid_y])
         # ------------ 保存模型 ------------
+        args['Model']['features'] = ','.join(dh.opt.features)
         args['params'] = json.dumps(args)
         args['descr'] = '测试'
         args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

+ 1 - 1
models_processing/model_tf/tf_cnn_pre.py

@@ -60,7 +60,7 @@ def model_prediction_bp():
         cnn.opt.cap = round(target_scaler.transform(np.array([[float(args['cap'])]]))[0, 0], 2)
 
         cnn.get_model(args)
-        dh.opt.features = json.loads(cnn.model_params).get('features', cnn.opt.features)
+        dh.opt.features = json.loads(cnn.model_params).get('Model').get('features', ','.join(cnn.opt.features)).split(',')
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
         logger.info("---------cap归一化:{}".format(cnn.opt.cap))
 

+ 10 - 8
models_processing/model_tf/tf_cnn_train.py

@@ -57,17 +57,19 @@ def model_training_bp():
         # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
         logger.info("---------cap归一化:{}".format(cnn.opt.cap))
         model = cnn.train_init() if cnn.opt.Model['add_train'] else cnn.get_keras_model(cnn.opt)
-        if cnn.opt.Model['add_train'] and model is not False:
-            feas = json.loads(cnn.model_params).get('features', args['features'])
-            if set(feas).issubset(set(dh.opt.features)):
-                dh.opt.features = list(feas)
-                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+        if cnn.opt.Model['add_train']:
+            if model:
+                feas = json.loads(cnn.model_params).get('features', args['features'])
+                if set(feas).issubset(set(dh.opt.features)):
+                    dh.opt.features = list(feas)
+                    train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+                else:
+                    model = cnn.get_keras_model(cnn.opt)
+                    logger.info("训练数据特征,不满足,加强训练模型特征")
             else:
                 model = cnn.get_keras_model(cnn.opt)
-                logger.info("训练数据特征,不满足,加强训练模型特征")
-
         bp_model = cnn.training(model, [train_x, train_y, valid_x, valid_y])
-
+        args['Model']['features'] = ','.join(dh.opt.features)
         args['params'] = json.dumps(args)
         args['descr'] = '测试'
         args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

+ 1 - 1
models_processing/model_tf/tf_lstm_pre.py

@@ -59,7 +59,7 @@ def model_prediction_bp():
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
         ts.opt.cap = round(target_scaler.transform(np.array([[float(args['cap'])]]))[0, 0], 2)
         ts.get_model(args)
-        dh.opt.features = json.loads(ts.model_params).get('features', ts.opt.features)
+        dh.opt.features = json.loads(ts.model_params).get('Model').get('features', ','.join(ts.opt.features)).split(',')
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
         res = list(chain.from_iterable(target_scaler.inverse_transform(ts.predict(scaled_pre_x))))
         pre_data['farm_id'] = args.get('farm_id', 'null')

+ 10 - 7
models_processing/model_tf/tf_lstm_train.py

@@ -54,16 +54,19 @@ def model_training_bp():
         # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
         # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
         model = ts.train_init() if ts.opt.Model['add_train'] else ts.get_keras_model(ts.opt)
-        if ts.opt.Model['add_train'] and model is not False:
-            feas = json.loads(ts.model_params).get('features', args['features'])
-            if set(feas).issubset(set(dh.opt.features)):
-                dh.opt.features = list(feas)
-                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+        if ts.opt.Model['add_train']:
+            if model:
+                feas = json.loads(ts.model_params).get('features', args['features'])
+                if set(feas).issubset(set(dh.opt.features)):
+                    dh.opt.features = list(feas)
+                    train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(train_data)
+                else:
+                    model = ts.get_keras_model(ts.opt)
+                    logger.info("训练数据特征,不满足,加强训练模型特征")
             else:
                 model = ts.get_keras_model(ts.opt)
-                logger.info("训练数据特征,不满足,加强训练模型特征")
         ts_model = ts.training(model, [train_x, train_y, valid_x, valid_y])
-        args['features'] = dh.opt.features
+        args['Model']['features'] = ','.join(dh.opt.features)
         args['params'] = json.dumps(args)
         args['descr'] = '测试'
         args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))

+ 1 - 1
models_processing/model_tf/tf_test_pre.py

@@ -60,7 +60,7 @@ def model_prediction_test():
         ts.opt.cap = round(target_scaler.transform(np.array([[float(args['cap'])]]))[0, 0], 2)
 
         ts.get_model(args)
-        dh.opt.features = json.loads(ts.model_params).get('features', ts.opt.features)
+        dh.opt.features = json.loads(ts.model_params).get('Model').get('features', ','.join(ts.opt.features)).split(',')
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
 
         res = list(chain.from_iterable(target_scaler.inverse_transform(ts.predict(scaled_pre_x))))

+ 11 - 8
models_processing/model_tf/tf_test_train.py

@@ -54,17 +54,20 @@ def model_training_test():
         # 1. 如果是加强训练模式,先加载预训练模型特征参数,再预处理训练数据
         # 2. 如果是普通模式,先预处理训练数据,再根据训练数据特征加载模型
         model = ts.train_init() if ts.opt.Model['add_train'] else ts.get_keras_model(ts.opt)
-        if ts.opt.Model['add_train'] and model is not False:
-            feas = json.loads(ts.model_params).get('features', args['features'])
-            if set(feas).issubset(set(dh.opt.features)):
-                dh.opt.features = list(feas)
-                train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(
-                    train_data)
+        if ts.opt.Model['add_train']:
+            if model:
+                feas = json.loads(ts.model_params).get('features', args['features'])
+                if set(feas).issubset(set(dh.opt.features)):
+                    dh.opt.features = list(feas)
+                    train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes, scaled_cap = dh.train_data_handler(
+                        train_data)
+                else:
+                    model = ts.get_keras_model(ts.opt)
+                    logger.info("训练数据特征,不满足,加强训练模型特征")
             else:
                 model = ts.get_keras_model(ts.opt)
-                logger.info("训练数据特征,不满足,加强训练模型特征")
-
         ts_model = ts.training(model, [train_x, train_y, valid_x, valid_y])
+        args['Model']['features'] = ','.join(dh.opt.features)
         args['params'] = json.dumps(args)
         args['descr'] = '测试'
         args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))