David 2 月之前
父節點
當前提交
2f04a4c7c9

+ 4 - 6
data_processing/data_operation/data_handler.py

@@ -5,7 +5,6 @@
 # @Author    :David
 # @Company: shenyang JY
 import argparse, numbers, joblib
-
 import numpy as np
 import pandas as pd
 from io import BytesIO
@@ -158,7 +157,7 @@ class DataHandler(object):
                 vy.append(data[1])
         return tx, vx, ty, vy
 
-    def train_data_handler(self, data, opt, bp_data=False):
+    def train_data_handler(self, data, bp_data=False):
         """
         训练数据预处理:
         清洗+补值+归一化
@@ -171,7 +170,7 @@ class DataHandler(object):
             y_train
             y_valid
         """
-        col_time, features, target = opt.col_time, opt.features, opt.target
+        col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
         # 清洗限电记录
         if 'is_limit' in data.columns:
             data = data[data['is_limit'] == False]
@@ -211,7 +210,7 @@ class DataHandler(object):
             train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
         return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes
 
-    def pre_data_handler(self, data, feature_scaler, opt, bp_data=False):
+    def pre_data_handler(self, data, feature_scaler, bp_data=False):
         """
         预测数据简单处理
         Args:
@@ -225,7 +224,7 @@ class DataHandler(object):
             data = data[data['is_limit'] == False]
         # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
         #     args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
-        col_time, features = opt.col_time, opt.features
+        col_time, features = self.opt.col_time, self.opt.features
         pre_data = data.sort_values(by=col_time)[features]
         scaled_features = feature_scaler.transform(pre_data[features])
         pre_data[features] = scaled_features
@@ -233,5 +232,4 @@ class DataHandler(object):
             pre_x = np.array(pre_data)
         else:
             pre_x = self.get_predict_data([pre_data], features)
-            pre_x = np.array(pre_x)
         return pre_x

+ 11 - 9
models_processing/model_koi/tf_cnn.py

@@ -14,11 +14,13 @@ from models_processing.losses.loss_cdq import rmse
 import numpy as np
 from common.database_dml import *
 from threading import Lock
+import argparse
 model_lock = Lock()
 
 class CNNHandler(object):
-    def __init__(self, logger):
+    def __init__(self, logger, args):
         self.logger = logger
+        self.opt = argparse.Namespace(**args)
         self.model = None
 
     def get_model(self, args):
@@ -52,28 +54,28 @@ class CNNHandler(object):
         model.compile(loss=rmse, optimizer=adam)
         return model
 
-    def train_init(self, opt):
+    def train_init(self):
         try:
-            if opt.Model['add_train']:
+            if self.opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(opt), {'rmse': rmse})
+                base_train_model = get_h5_model_from_mongo(vars(self.opt), {'rmse': rmse})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:
-                base_train_model = self.get_keras_model(opt)
+                base_train_model = self.get_keras_model(self.opt)
             return base_train_model
         except Exception as e:
             self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
 
-    def training(self, opt, train_and_valid_data):
-        model = self.train_init(opt)
+    def training(self, train_and_valid_data):
+        model = self.train_init()
         # tf.reset_default_graph() # 清除默认图
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         print("----------", np.array(train_x[0]).shape)
         print("++++++++++", np.array(train_x[1]).shape)
         model.summary()
-        early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
-        history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
+        early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto')
+        history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
         loss = np.round(history.history['loss'], decimals=5)
         val_loss = np.round(history.history['val_loss'], decimals=5)
         self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))

+ 50 - 44
models_processing/model_koi/tf_cnn_pre.py

@@ -25,44 +25,50 @@ app = Flask('tf_cnn_pre——service')
 
 with app.app_context():
     with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
+        args = yaml.safe_load(f)
 
-    dh = DataHandler(logger, arguments)
-    cnn = CNNHandler(logger)
+    dh = DataHandler(logger, args)
+    cnn = CNNHandler(logger, args)
+    global opt
 
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    cnn.opt = opt
+    logger.info(args)
 
-@app.route('/nn_bp_predict', methods=['POST'])
+@app.route('/nn_cnn_predict', methods=['POST'])
 def model_prediction_bp():
     # 获取程序开始时间
     start_time = time.time()
     result = {}
     success = 0
     print("Program starts execution!")
-    params_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(params_dict)
     try:
-        print('args', args)
-        logger.info(args)
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
-        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, args)
+        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler)
         cnn.get_model(args)
         # result = bp.predict(scaled_pre_x, args)
-        result = list(chain.from_iterable(target_scaler.inverse_transform([cnn.predict(scaled_pre_x).flatten()])))
-        pre_data['power_forecast'] = result[:len(pre_data)]
+        res = list(chain.from_iterable(target_scaler.inverse_transform([cnn.predict(scaled_pre_x).flatten()])))
+        pre_data['power_forecast'] = res[:len(pre_data)]
         pre_data['farm_id'] = 'J00083'
         pre_data['cdq'] = 1
         pre_data['dq'] = 1
         pre_data['zq'] = 1
-        pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+        pre_data.rename(columns={args['col_time']: 'date_time'}, inplace=True)
         pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
 
         pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
         pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
         pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
 
-        insert_data_into_mongo(pre_data, arguments)
+        insert_data_into_mongo(pre_data, args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()
@@ -84,36 +90,36 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=1010x, threads=4)
+    serve(app, host="0.0.0.0", port=1010, threads=4)
     print("server start!")
 
     # ------------------------测试代码------------------------
-    args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
-                 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time', 'mongodb_write_table': 'j00083_rs',
-                 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    cnn = CNNHandler(logger)
-    opt = argparse.Namespace(**arguments)
-
-    opt.Model['input_size'] = len(opt.features)
-    pre_data = get_data_from_mongo(args_dict)
-    feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
-    pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt)
-    cnn.get_model(arguments)
-    result = cnn.predict(pre_x)
-    result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
-    pre_data['power_forecast'] = result1[:len(pre_data)]
-    pre_data['farm_id'] = 'J00083'
-    pre_data['cdq'] = 1
-    pre_data['dq'] = 1
-    pre_data['zq'] = 1
-    pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
-    pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
-
-    pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
-    pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
-    pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
-
-    insert_data_into_mongo(pre_data, arguments)
+    # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+    #              'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time', 'mongodb_write_table': 'j00083_rs',
+    #              'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # cnn = CNNHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    #
+    # opt.Model['input_size'] = len(opt.features)
+    # pre_data = get_data_from_mongo(args_dict)
+    # feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
+    # pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt)
+    # cnn.get_model(arguments)
+    # result = cnn.predict(pre_x)
+    # result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
+    # pre_data['power_forecast'] = result1[:len(pre_data)]
+    # pre_data['farm_id'] = 'J00083'
+    # pre_data['cdq'] = 1
+    # pre_data['dq'] = 1
+    # pre_data['zq'] = 1
+    # pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+    # pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
+    #
+    # pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
+    # pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
+    # pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
+    #
+    # insert_data_into_mongo(pre_data, arguments)

+ 17 - 10
models_processing/model_koi/tf_cnn_train.py

@@ -26,7 +26,19 @@ with app.app_context():
         args = yaml.safe_load(f)
 
     dh = DataHandler(logger, args)
-    cnn = CNNHandler(logger)
+    cnn = CNNHandler(logger, args)
+    global opt
+
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    cnn.opt = opt
+    logger.info(args)
 
 @app.route('/nn_cnn_training', methods=['POST'])
 def model_training_bp():
@@ -35,18 +47,13 @@ def model_training_bp():
     result = {}
     success = 0
     print("Program starts execution!")
-    args_dict = request.values.to_dict()
-    args_dict['features'] = args_dict['features'].split(',')
-    args.update(args_dict)
-    opt = argparse.Namespace(**args)
-    logger.info(args_dict)
     try:
         # ------------ 获取数据,预处理训练数据 ------------
-        train_data = get_data_from_mongo(args_dict)
-        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
+        train_data = get_data_from_mongo(args)
+        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data)
         # ------------ 训练模型,保存模型 ------------
-        opt.Model['input_size'] = train_x.shape[2]
-        bp_model = cnn.training(opt, [train_x, valid_x, train_y, valid_y])
+        cnn.opt.Model['input_size'] = train_x.shape[2]
+        bp_model = cnn.training([train_x, valid_x, train_y, valid_y])
 
         args['params'] = json.dumps(args)
         args['descr'] = '测试'