David hace 4 semanas
padre
commit
6ca4814ad5

+ 12 - 9
app/model/tf_cnn_train.py

@@ -14,11 +14,11 @@ import time
 from app.common.tf_cnn import CNNHandler
 from app.common.dbmg import MongoUtils
 from app.common.logs import logger
+from copy import deepcopy
 np.random.seed(42)  # NumPy随机种子
 # tf.set_random_seed(42)  # TensorFlow随机种子
 
-dh = DataHandler(logger, params)
-cnn = CNNHandler(logger, params)
+
 mgUtils = MongoUtils(logger)
 
 def model_training(train_data, input_file, cap):
@@ -29,6 +29,9 @@ def model_training(train_data, input_file, cap):
     farm_id = input_file.split('/')[-2]
     output_file = input_file.replace('IN', 'OUT')
     status_file = 'STATUS.TXT'
+    local_params = deepcopy(params)
+    dh = DataHandler(logger, local_params)
+    cnn = CNNHandler(logger, local_params)
     try:
         # ------------ 获取数据,预处理训练数据 ------------
         dh.opt.cap = cap
@@ -56,16 +59,16 @@ def model_training(train_data, input_file, cap):
         # 更新算法状态:1. 启动成功
         write_number_to_file(os.path.join(output_file, status_file), 1, 1, 'rewrite')
         # ------------ 组装模型数据 ------------
-        params['Model']['features'] = ','.join(dh.opt.features)
-        params.update({
-            'params': json.dumps(params),
+        local_params['Model']['features'] = ','.join(dh.opt.features)
+        local_params.update({
+            'params': json.dumps(local_params),
             'descr': f'南网竞赛-{farm_id}',
             'gen_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
-            'model_table': params['model_table'] + farm_id,
-            'scaler_table': params['scaler_table'] + farm_id
+            'model_table': local_params['model_table'] + farm_id,
+            'scaler_table': local_params['scaler_table'] + farm_id
         })
-        mgUtils.insert_trained_model_into_mongo(ts_model, params)
-        mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, params)
+        mgUtils.insert_trained_model_into_mongo(ts_model, local_params)
+        mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, local_params)
         # 更新算法状态:正常结束
         write_number_to_file(os.path.join(output_file, status_file), 2, 2)
     except Exception as e:

+ 12 - 9
app/model/tf_fmi_train.py

@@ -14,11 +14,10 @@ import time
 from app.common.tf_fmi import FMIHandler
 from app.common.dbmg import MongoUtils
 from app.common.logs import logger
+from copy import deepcopy
 np.random.seed(42)  # NumPy随机种子
 # tf.set_random_seed(42)  # TensorFlow随机种子
 
-dh = DataHandler(logger, params)
-fmi = FMIHandler(logger, params)
 mgUtils = MongoUtils(logger)
 
 def model_training(train_data, input_file, cap):
@@ -29,6 +28,10 @@ def model_training(train_data, input_file, cap):
     farm_id = input_file.split('/')[-2]
     output_file = input_file.replace('IN', 'OUT')
     status_file = 'STATUS.TXT'
+    # 创建线程独立的实例
+    local_params = deepcopy(params)
+    dh = DataHandler(logger, local_params)
+    fmi = FMIHandler(logger, local_params)
     try:
         # ------------ 获取数据,预处理训练数据 ------------
         dh.opt.cap = cap
@@ -56,16 +59,16 @@ def model_training(train_data, input_file, cap):
         # 更新算法状态:1. 启动成功
         write_number_to_file(os.path.join(output_file, status_file), 1, 1, 'rewrite')
         # ------------ 组装模型数据 ------------
-        params['Model']['features'] = ','.join(dh.opt.features)
-        params.update({
-            'params': json.dumps(params),
+        local_params['Model']['features'] = ','.join(dh.opt.features)
+        local_params.update({
+            'params': json.dumps(local_params),
             'descr': f'南网竞赛-{farm_id}',
             'gen_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
-            'model_table': params['model_table'] + farm_id,
-            'scaler_table': params['scaler_table'] + farm_id
+            'model_table': local_params['model_table'] + farm_id,
+            'scaler_table': local_params['scaler_table'] + farm_id
         })
-        mgUtils.insert_trained_model_into_mongo(ts_model, params)
-        mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, params)
+        mgUtils.insert_trained_model_into_mongo(ts_model, local_params)
+        mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, local_params)
         # 更新算法状态:正常结束
         write_number_to_file(os.path.join(output_file, status_file), 2, 2)
     except Exception as e:

+ 12 - 9
app/model/tf_lstm_train.py

@@ -14,11 +14,10 @@ import time
 from app.common.tf_lstm import TSHandler
 from app.common.dbmg import MongoUtils
 from app.common.logs import logger
+from copy import deepcopy
 np.random.seed(42)  # NumPy随机种子
 # tf.set_random_seed(42)  # TensorFlow随机种子
 
-dh = DataHandler(logger, params)
-ts = TSHandler(logger, params)
 mgUtils = MongoUtils(logger)
 
 def model_training(train_data, input_file, cap):
@@ -29,6 +28,10 @@ def model_training(train_data, input_file, cap):
     farm_id = input_file.split('/')[-2]
     output_file = input_file.replace('IN', 'OUT')
     status_file = 'STATUS.TXT'
+    # 创建线程独立的实例
+    local_params = deepcopy(params)
+    dh = DataHandler(logger, local_params)
+    ts = TSHandler(logger, local_params)
     try:
         # ------------ 获取数据,预处理训练数据 ------------
         dh.opt.cap = cap
@@ -56,16 +59,16 @@ def model_training(train_data, input_file, cap):
         # 更新算法状态:1. 启动成功
         write_number_to_file(os.path.join(output_file, status_file), 1, 1, 'rewrite')
         # ------------ 组装模型数据 ------------
-        params['Model']['features'] = ','.join(dh.opt.features)
-        params.update({
-            'params': json.dumps(params),
+        local_params['Model']['features'] = ','.join(dh.opt.features)
+        local_params.update({
+            'params': json.dumps(local_params),
             'descr': f'南网竞赛-{farm_id}',
             'gen_time': time.strftime('%Y-%m-%d %H:%M:%S', time.localtime()),
-            'model_table': params['model_table'] + farm_id,
-            'scaler_table': params['scaler_table'] + farm_id
+            'model_table': local_params['model_table'] + farm_id,
+            'scaler_table': local_params['scaler_table'] + farm_id
         })
-        mgUtils.insert_trained_model_into_mongo(ts_model, params)
-        mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, params)
+        mgUtils.insert_trained_model_into_mongo(ts_model, local_params)
+        mgUtils.insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, local_params)
         # 更新算法状态:正常结束
         write_number_to_file(os.path.join(output_file, status_file), 2, 2)
     except Exception as e:

+ 9 - 9
app/predict/tf_cnn_pre.py

@@ -17,12 +17,9 @@ from itertools import chain
 from app.common.logs import logger, params
 from app.common.tf_cnn import CNNHandler
 from app.common.dbmg import MongoUtils
-
+from copy import deepcopy
 np.random.seed(42)  # NumPy随机种子
 
-
-dh = DataHandler(logger, params)
-cnn = CNNHandler(logger, params)
 mgUtils = MongoUtils(logger)
 
 
@@ -35,12 +32,15 @@ def model_prediction(pre_data, input_file, cap):
     output_file = input_file.replace('IN', 'OUT')
     file = 'DQYC_OUT_PREDICT_POWER.txt'
     status_file = 'STATUS.TXT'
+    local_params = deepcopy(params)
+    dh = DataHandler(logger, local_params)
+    cnn = CNNHandler(logger, local_params)
     try:
-        params['model_table'] += farm_id
-        params['scaler_table'] += farm_id
-        feature_scaler, target_scaler = mgUtils.get_scaler_model_from_mongo(params)
+        local_params['model_table'] += farm_id
+        local_params['scaler_table'] += farm_id
+        feature_scaler, target_scaler = mgUtils.get_scaler_model_from_mongo(local_params)
         cnn.opt.cap = round(target_scaler.transform(np.array([[cap]]))[0, 0], 2)
-        cnn.get_model(params)
+        cnn.get_model(local_params)
         dh.opt.features = json.loads(cnn.model_params).get('Model').get('features', ','.join(cnn.opt.features)).split(',')
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
 
@@ -51,7 +51,7 @@ def model_prediction(pre_data, input_file, cap):
         res = list(chain.from_iterable(target_scaler.inverse_transform([cnn.predict(scaled_pre_x).flatten()])))
         pre_data['Power'] = res[:len(pre_data)]
         pre_data['PlantID'] = farm_id
-        pre_data = pre_data[['PlantID', params['col_time'], 'Power']]
+        pre_data = pre_data[['PlantID', local_params['col_time'], 'Power']]
 
         pre_data.loc[:, 'Power'] = pre_data['Power'].round(2)
         pre_data.loc[pre_data['Power'] > cap, 'Power'] = cap

+ 9 - 9
app/predict/tf_fmi_pre.py

@@ -17,12 +17,9 @@ from itertools import chain
 from app.common.logs import logger, params
 from app.common.tf_fmi import FMIHandler
 from app.common.dbmg import MongoUtils
-
+from copy import deepcopy
 np.random.seed(42)  # NumPy随机种子
 
-
-dh = DataHandler(logger, params)
-fmi = FMIHandler(logger, params)
 mgUtils = MongoUtils(logger)
 
 
@@ -35,12 +32,15 @@ def model_prediction(pre_data, input_file, cap):
     output_file = input_file.replace('IN', 'OUT')
     file = 'DQYC_OUT_PREDICT_POWER.txt'
     status_file = 'STATUS.TXT'
+    local_params = deepcopy(params)
+    dh = DataHandler(logger, local_params)
+    fmi = FMIHandler(logger, local_params)
     try:
-        params['model_table'] += farm_id
-        params['scaler_table'] += farm_id
-        feature_scaler, target_scaler = mgUtils.get_scaler_model_from_mongo(params)
+        local_params['model_table'] += farm_id
+        local_params['scaler_table'] += farm_id
+        feature_scaler, target_scaler = mgUtils.get_scaler_model_from_mongo(local_params)
         fmi.opt.cap = round(target_scaler.transform(np.array([[cap]]))[0, 0], 2)
-        fmi.get_model(params)
+        fmi.get_model(local_params)
         dh.opt.features = json.loads(fmi.model_params).get('Model').get('features', ','.join(fmi.opt.features)).split(',')
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
 
@@ -51,7 +51,7 @@ def model_prediction(pre_data, input_file, cap):
         res = list(chain.from_iterable(target_scaler.inverse_transform([fmi.predict(scaled_pre_x).flatten()])))
         pre_data['Power'] = res[:len(pre_data)]
         pre_data['PlantID'] = farm_id
-        pre_data = pre_data[['PlantID', params['col_time'], 'Power']]
+        pre_data = pre_data[['PlantID', local_params['col_time'], 'Power']]
 
         pre_data.loc[:, 'Power'] = pre_data['Power'].round(2)
         pre_data.loc[pre_data['Power'] > cap, 'Power'] = cap

+ 9 - 8
app/predict/tf_lstm_pre.py

@@ -17,12 +17,10 @@ from itertools import chain
 from app.common.logs import logger, params
 from app.common.tf_lstm import TSHandler
 from app.common.dbmg import MongoUtils
+from copy import deepcopy
 
 np.random.seed(42)  # NumPy随机种子
 
-
-dh = DataHandler(logger, params)
-ts = TSHandler(logger, params)
 mgUtils = MongoUtils(logger)
 
 
@@ -35,12 +33,15 @@ def model_prediction(pre_data, input_file, cap):
     output_file = input_file.replace('IN', 'OUT')
     file = 'DQYC_OUT_PREDICT_POWER.txt'
     status_file = 'STATUS.TXT'
+    local_params = deepcopy(params)
+    ts = TSHandler(logger, local_params)
+    dh = DataHandler(logger, local_params)
     try:
-        params['model_table'] += farm_id
-        params['scaler_table'] += farm_id
-        feature_scaler, target_scaler = mgUtils.get_scaler_model_from_mongo(params)
+        local_params['model_table'] += farm_id
+        local_params['scaler_table'] += farm_id
+        feature_scaler, target_scaler = mgUtils.get_scaler_model_from_mongo(local_params)
         ts.opt.cap = round(target_scaler.transform(np.array([[cap]]))[0, 0], 2)
-        ts.get_model(params)
+        ts.get_model(local_params)
         dh.opt.features = json.loads(ts.model_params).get('Model').get('features', ','.join(ts.opt.features)).split(',')
         scaled_pre_x, pre_data = dh.pre_data_handler(pre_data, feature_scaler)
 
@@ -51,7 +52,7 @@ def model_prediction(pre_data, input_file, cap):
         res = list(chain.from_iterable(target_scaler.inverse_transform([ts.predict(scaled_pre_x).flatten()])))
         pre_data['Power'] = res[:len(pre_data)]
         pre_data['PlantID'] = farm_id
-        pre_data = pre_data[['PlantID', params['col_time'], 'Power']]
+        pre_data = pre_data[['PlantID', local_params['col_time'], 'Power']]
 
         pre_data.loc[:, 'Power'] = pre_data['Power'].round(2)
         pre_data.loc[pre_data['Power'] > cap, 'Power'] = cap