소스 검색

Merge branch 'dev_david' of anweiguo/algorithm_platform into dev_awg

liudawei 2 달 전
부모
커밋
a1d497e856

+ 4 - 4
common/data_cleaning.py

@@ -7,12 +7,12 @@
 import numpy as np
 np.random.seed(42)
 
-def cleaning(df, name, logger, cols=None, dup=True):
+def cleaning(df, name, logger, cols=None, dup=True, col_time='dateTime'):
     logger.info("开始清洗:{}……".format(name))
     data = df.copy()
     data = data_column_cleaning(data, logger)
     if dup:
-        data = rm_duplicated(data, logger)
+        data = rm_duplicated(data, logger, col_time)
     if cols is not None:
         data = key_field_row_cleaning(data, cols, logger)
     return data
@@ -65,7 +65,7 @@ def key_field_row_cleaning(data, cols, logger):
         logger.info("行清洗:清洗的行数有:{},缺失的列有:{}".format(rows_pre-rows_late, ', '.join(nan_cols)))
     return data
 
-def rm_duplicated(data, logger):
+def rm_duplicated(data, logger, col_time='dateTime'):
     """
     按照时间去重
     :param data:
@@ -74,7 +74,7 @@ def rm_duplicated(data, logger):
     """
     # 按照时间去重
     rows_pre = len(data)
-    data = data.drop_duplicates(subset='dateTime')
+    data = data.drop_duplicates(subset=col_time)
     rows_late = len(data)
     if rows_pre - rows_late > 0:
         logger.info("时间去重的行数有:{}".format(rows_pre - rows_late))

+ 273 - 0
common/database_dml_koi.py

@@ -0,0 +1,273 @@
+from pymongo import MongoClient, UpdateOne, DESCENDING
+import pandas as pd
+from sqlalchemy import create_engine
+import pickle
+from io import BytesIO
+import joblib
+import h5py
+import tensorflow as tf
+
+def get_data_from_mongo(args):
+    mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
+    mongodb_database = args['mongodb_database']
+    mongodb_read_table = args['mongodb_read_table']
+    query_dict = {}
+    if 'timeBegin' in args.keys():
+        timeBegin = args['timeBegin']
+        query_dict.update({"$gte": timeBegin})
+    if 'timeEnd' in args.keys():
+        timeEnd = args['timeEnd']
+        query_dict.update({"$lte": timeEnd})
+
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    collection = db[mongodb_read_table]  # 集合名称
+    if len(query_dict) != 0:
+        query = {"dateTime": query_dict}
+        cursor = collection.find(query)
+    else:
+        cursor = collection.find()
+    data = list(cursor)
+    df = pd.DataFrame(data)
+    # 4. 删除 _id 字段(可选)
+    if '_id' in df.columns:
+        df = df.drop(columns=['_id'])
+    client.close()
+    return df
+
+
+def get_df_list_from_mongo(args):
+    mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'].split(',')
+    df_list = []
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    for table in mongodb_read_table:
+        collection = db[table]  # 集合名称
+        data_from_db = collection.find()  # 这会返回一个游标(cursor)
+        # 将游标转换为列表,并创建 pandas DataFrame
+        df = pd.DataFrame(list(data_from_db))
+        if '_id' in df.columns:
+            df = df.drop(columns=['_id'])
+        df_list.append(df)
+    client.close()
+    return df_list
+
+def insert_data_into_mongo(res_df, args):
+    """
+    插入数据到 MongoDB 集合中,可以选择覆盖、追加或按指定的 key 进行更新插入。
+
+    参数:
+    - res_df: 要插入的 DataFrame 数据
+    - args: 包含 MongoDB 数据库和集合名称的字典
+    - overwrite: 布尔值,True 表示覆盖,False 表示追加
+    - update_keys: 列表,指定用于匹配的 key 列,如果存在则更新,否则插入 'col1','col2'
+    """
+    mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
+    mongodb_database = args['mongodb_database']
+    mongodb_write_table = args['mongodb_write_table']
+    overwrite = 1
+    update_keys = None
+    if 'overwrite' in args.keys():
+        overwrite = int(args['overwrite'])
+    if 'update_keys' in args.keys():
+        update_keys = args['update_keys'].split(',')
+
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    collection = db[mongodb_write_table]
+
+    # 覆盖模式:删除现有集合
+    if overwrite:
+        if mongodb_write_table in db.list_collection_names():
+            collection.drop()
+            print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!")
+
+    # 将 DataFrame 转为字典格式
+    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
+
+    # 如果没有数据,直接返回
+    if not data_dict:
+        print("No data to insert.")
+        return
+
+    # 如果指定了 update_keys,则执行 upsert(更新或插入)
+    if update_keys and not overwrite:
+        operations = []
+        for record in data_dict:
+            # 构建查询条件,用于匹配要更新的文档
+            query = {key: record[key] for key in update_keys}
+            operations.append(UpdateOne(query, {'$set': record}, upsert=True))
+
+        # 批量执行更新/插入操作
+        if operations:
+            result = collection.bulk_write(operations)
+            print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}")
+    else:
+        # 追加模式:直接插入新数据
+        collection.insert_many(data_dict)
+        print("Data inserted successfully!")
+
+
+def get_data_fromMysql(params):
+    mysql_conn = params['mysql_conn']
+    query_sql = params['query_sql']
+    #数据库读取实测气象
+    engine = create_engine(f"mysql+pymysql://{mysql_conn}")
+    # 定义SQL查询
+    env_df = pd.read_sql_query(query_sql, engine)
+    return env_df
+
+
+def insert_pickle_model_into_mongo(model, args):
+    mongodb_connection, mongodb_database, mongodb_write_table, model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", \
+    args['mongodb_database'], args['mongodb_write_table'], args['model_name']
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    # 序列化模型
+    model_bytes = pickle.dumps(model)
+    model_data = {
+        'model_name': model_name,
+        'model': model_bytes,  # 将模型字节流存入数据库
+    }
+    print('Training completed!')
+
+    if mongodb_write_table in db.list_collection_names():
+        db[mongodb_write_table].drop()
+        print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
+    collection = db[mongodb_write_table]  # 集合名称
+    collection.insert_one(model_data)
+    print("model inserted successfully!")
+
+
+def insert_h5_model_into_mongo(model,feature_scaler_bytes,target_scaler_bytes ,args):
+    mongodb_connection,mongodb_database,scaler_table,model_table,model_name = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
+                                args['mongodb_database'],args['scaler_table'],args['model_table'],args['model_name'])
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    if scaler_table in db.list_collection_names():
+        db[scaler_table].drop()
+        print(f"Collection '{scaler_table} already exist, deleted successfully!")
+    collection = db[scaler_table]  # 集合名称
+    # Save the scalers in MongoDB as binary data
+    collection.insert_one({
+        "feature_scaler": feature_scaler_bytes.read(),
+        "target_scaler": target_scaler_bytes.read()
+    })
+    print("scaler_model inserted successfully!")
+    if model_table in db.list_collection_names():
+        db[model_table].drop()
+        print(f"Collection '{model_table} already exist, deleted successfully!")
+    model_table = db[model_table]
+    # 创建 BytesIO 缓冲区
+    model_buffer = BytesIO()
+    # 将模型保存为 HDF5 格式到内存 (BytesIO)
+    model.save(model_buffer, save_format='h5')
+    # 将指针移到缓冲区的起始位置
+    model_buffer.seek(0)
+    # 获取模型的二进制数据
+    model_data = model_buffer.read()
+    # 将模型保存到 MongoDB
+    model_table.insert_one({
+        "model_name": model_name,
+        "model_data": model_data
+    })
+    print("模型成功保存到 MongoDB!")
+
+def insert_trained_model_into_mongo(model, args):
+    mongodb_connection,mongodb_database,model_table,model_name = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
+                                args['mongodb_database'],args['model_table'],args['model_name'])
+
+    gen_time, params_json, descr = args['gen_time'], args['params'], args['descr']
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    if model_table in db.list_collection_names():
+        db[model_table].drop()
+        print(f"Collection '{model_table} already exist, deleted successfully!")
+    model_table = db[model_table]
+    # 创建 BytesIO 缓冲区
+    model_buffer = BytesIO()
+    # 将模型保存为 HDF5 格式到内存 (BytesIO)
+    model.save(model_buffer, save_format='h5')
+    # 将指针移到缓冲区的起始位置
+    model_buffer.seek(0)
+    # 获取模型的二进制数据
+    model_data = model_buffer.read()
+    # 将模型保存到 MongoDB
+    model_table.insert_one({
+        "model_name": model_name,
+        "model_data": model_data,
+        "gen_time": gen_time,
+        "params": params_json,
+        "descr": descr
+    })
+    print("模型成功保存到 MongoDB!")
+
+def insert_scaler_model_into_mongo(feature_scaler_bytes, scaled_target_bytes, args):
+    mongodb_connection,mongodb_database,scaler_table,model_table,model_name = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
+                                args['mongodb_database'],args['scaler_table'],args['model_table'],args['model_name'])
+    gen_time = args['gen_time']
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    if scaler_table in db.list_collection_names():
+        db[scaler_table].drop()
+        print(f"Collection '{scaler_table} already exist, deleted successfully!")
+    collection = db[scaler_table]  # 集合名称
+    # Save the scalers in MongoDB as binary data
+    collection.insert_one({
+        "model_name": model_name,
+        "gent_time": gen_time,
+        "feature_scaler": feature_scaler_bytes.read(),
+        "target_scaler": scaled_target_bytes.read()
+    })
+    print("scaler_model inserted successfully!")
+
+
+def get_h5_model_from_mongo(args, custom=None):
+    mongodb_connection,mongodb_database,model_table,model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['model_table'],args['model_name']
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    collection = db[model_table]  # 集合名称
+
+     # 查询 MongoDB 获取模型数据
+    model_doc = collection.find_one({"model_name": model_name}, sort=[('gen_time', DESCENDING)])
+    if model_doc:
+        model_data = model_doc['model_data']  # 获取模型的二进制数据
+        # 将二进制数据加载到 BytesIO 缓冲区
+        model_buffer = BytesIO(model_data)
+        # 从缓冲区加载模型
+         # 使用 h5py 和 BytesIO 从内存中加载模型
+        with h5py.File(model_buffer, 'r') as f:
+            model = tf.keras.models.load_model(f, custom_objects=custom)
+        print(f"{model_name}模型成功从 MongoDB 加载!")
+        client.close()
+        return model
+    else:
+        print(f"未找到model_name为 {model_name} 的模型。")
+        client.close()
+        return None
+
+
+def get_scaler_model_from_mongo(args, only_feature_scaler=False):
+    """
+    根据模 型名称版本 和 生成时间 获取模型
+    """
+    mongodb_connection, mongodb_database, scaler_table, = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/", args['mongodb_database'], args['scaler_table'])
+    model_name, gen_time = args['model_name'], args['gent_time']
+    client = MongoClient(mongodb_connection)
+    # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
+    db = client[mongodb_database]
+    collection = db[scaler_table]  # 集合名称
+    # Retrieve the scalers from MongoDB
+    scaler_doc = collection.find_one({"model_name": model_name, "gen_time": gen_time})
+    # Deserialize the scalers
+
+    feature_scaler_bytes = BytesIO(scaler_doc["feature_scaler"])
+    feature_scaler = joblib.load(feature_scaler_bytes)
+    if only_feature_scaler:
+        return feature_scaler
+    target_scaler_bytes = BytesIO(scaler_doc["target_scaler"])
+    target_scaler = joblib.load(target_scaler_bytes)
+    return feature_scaler,target_scaler

+ 20 - 18
data_processing/data_operation/data_handler.py

@@ -5,6 +5,7 @@
 # @Author    :David
 # @Company: shenyang JY
 import argparse, numbers, joblib
+import numpy as np
 import pandas as pd
 from io import BytesIO
 from bson.decimal128 import Decimal128
@@ -156,7 +157,7 @@ class DataHandler(object):
                 vy.append(data[1])
         return tx, vx, ty, vy
 
-    def train_data_handler(self, data, opt, bp_data=False):
+    def train_data_handler(self, data, bp_data=False):
         """
         训练数据预处理:
         清洗+补值+归一化
@@ -169,20 +170,20 @@ class DataHandler(object):
             y_train
             y_valid
         """
-        col_time, features, target = opt.col_time, opt.features, opt.target
-        # 清洗处理好的限电记录
+        col_time, features, target = self.opt.col_time, self.opt.features, self.opt.target
+        # 清洗限电记录
         if 'is_limit' in data.columns:
             data = data[data['is_limit'] == False]
-        # 筛选特征,数值化
+        # 筛选特征,数值化,排序
         train_data = data[[col_time] + features + [target]]
+        train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        train_data = train_data.sort_values(by=col_time)
         # 清洗特征平均缺失率大于20%的天
         # train_data = missing_features(train_data, features, col_time)
-        train_data = train_data.sort_values(by=col_time)
-        # train_data = train_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
-        # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
-        train_data_cleaned = key_field_row_cleaning(train_data, features + [target], self.logger)
-        train_data_cleaned = train_data_cleaned.applymap(
-            lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        # 对清洗完限电的数据进行特征预处理:
+        # 1.空值异常值清洗
+        train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
+        # 2. 标准化
         # 创建特征和目标的标准化器
         train_scaler = MinMaxScaler(feature_range=(0, 1))
         target_scaler = MinMaxScaler(feature_range=(0, 1))
@@ -191,12 +192,11 @@ class DataHandler(object):
         scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
         train_data_cleaned[features] = scaled_train_data
         train_data_cleaned[[target]] = scaled_target
-
+        # 3.缺值补值
         train_datas = self.fill_train_data(train_data_cleaned, col_time)
         # 保存两个scaler
         scaled_train_bytes = BytesIO()
         scaled_target_bytes = BytesIO()
-
         joblib.dump(train_scaler, scaled_train_bytes)
         joblib.dump(target_scaler, scaled_target_bytes)
         scaled_train_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
@@ -204,12 +204,13 @@ class DataHandler(object):
 
         if bp_data:
             train_data = pd.concat(train_datas, axis=0)
-            train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data, scaled_target, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x, valid_x, train_y, valid_y = self.train_valid_split(train_data[features].values, train_data[target].values, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x, valid_x, train_y, valid_y =  np.array(train_x), np.array(valid_x), np.array(train_y), np.array(valid_y)
         else:
             train_x, valid_x, train_y, valid_y = self.get_train_data(train_datas, col_time, features, target)
-        return train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes
+        return train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes
 
-    def pre_data_handler(self, data, feature_scaler, opt, bp_data=False):
+    def pre_data_handler(self, data, feature_scaler, bp_data=False):
         """
         预测数据简单处理
         Args:
@@ -218,16 +219,17 @@ class DataHandler(object):
         return:
             scaled_features: 反归一化的特征
         """
+        # 清洗限电记录
         if 'is_limit' in data.columns:
             data = data[data['is_limit'] == False]
         # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
         #     args['time_steps']), args['col_time'], args['model_name'], str_to_list(args['col_reserve'])
-        col_time, features = opt.col_time, opt.features
+        col_time, features = self.opt.col_time, self.opt.features
         pre_data = data.sort_values(by=col_time)[features]
         scaled_features = feature_scaler.transform(pre_data[features])
         pre_data[features] = scaled_features
         if bp_data:
-            pre_x = self.get_predict_data([pre_data], features)
+            pre_x = np.array(pre_data)
         else:
-            pre_x = pre_data.values
+            pre_x = self.get_predict_data([pre_data], features)
         return pre_x

+ 4 - 0
evaluation_processing/analysis_cdq.py

@@ -157,6 +157,10 @@ def put_analysis_report_to_html(args, df_predict, df_accuracy):
             agg_dict['deviationAssessment'] = [np.nanmean, np.nansum]
             rename_cols.append('考核分数平均值')
             rename_cols.append('考核总分数')
+        if 'accuracyAssessment' in df_accuracy.columns:
+            agg_dict['deviationAssessment'] = [np.nanmean, np.nansum]
+            rename_cols.append('考核分数平均值')
+            rename_cols.append('考核总分数')
         # 进行分组聚合,如果有需要聚合的列
         summary_df = df_accuracy.groupby('model').agg(agg_dict).reset_index()
         summary_df.columns = rename_cols

+ 4 - 4
models_processing/losses/loss_cdq.py

@@ -31,8 +31,8 @@ class SouthLoss(tf.keras.losses.Loss):
         :return: 损失值
         """
         # 计算实际和预测的差值
-        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        # y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        # y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
         y_true = y_true[:, 15]
         y_predict = y_predict[:, 15]
         diff = y_true - y_predict
@@ -80,8 +80,8 @@ class NorthEastLoss(tf.keras.losses.Loss):
 
     def call(self, y_true, y_predict):
         # 这里我们添加了一个小的 epsilon 值来避免除以 0
-        y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
-        y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        # y_true = y_true * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
+        # y_predict = y_predict * self.opt.std['C_REAL_VALUE'] + self.opt.mean['C_REAL_VALUE']
 
         mask_logical = tf.logical_and(tf.greater(y_true, self.cap), tf.greater(y_predict, self.cap))
         # mask = tf.cast(~mask_logical, tf.float32)

+ 17 - 24
models_processing/model_koi/tf_bp.py

@@ -4,7 +4,7 @@
 # @Time      :2025/2/13 13:34
 # @Author    :David
 # @Company: shenyang JY
-
+from tensorflow.keras.models import Sequential
 from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
 from tensorflow.keras.models import Model, load_model
 from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
@@ -13,11 +13,13 @@ from models_processing.losses.loss_cdq import rmse
 import numpy as np
 from common.database_dml import *
 from threading import Lock
+import argparse
 model_lock = Lock()
 
 class BPHandler(object):
-    def __init__(self, logger):
+    def __init__(self, logger, args):
         self.logger = logger
+        self.opt = argparse.Namespace(**args)
         self.model = None
 
     def get_model(self, args):
@@ -33,46 +35,37 @@ class BPHandler(object):
 
     @staticmethod
     def get_keras_model(opt):
-        # db_loss = NorthEastLoss(opt)
-        # south_loss = SouthLoss(opt)
-        l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
-        l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
-        nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
-
-        con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
-        d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1)
-        nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1)
-
-        output = Dense(1, name='d5')(nwp)
-        output_f = Flatten()(output)
-        model = Model(nwp_input, output_f)
+        model = Sequential([
+            Dense(64, input_dim=opt.Model['input_size'], activation='relu'),  # 输入层和隐藏层,10个神经元
+            Dense(32, activation='relu'),  # 隐藏层,8个神经元
+            Dense(1, activation='linear')  # 输出层,1个神经元(用于回归任务)
+        ])
         adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
-        reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
         model.compile(loss=rmse, optimizer=adam)
         return model
 
-    def train_init(self, opt):
+    def train_init(self):
         try:
-            if opt.Model['add_train']:
+            if self.opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(opt), {'rmse': rmse})
+                base_train_model = get_h5_model_from_mongo(vars(self.opt), {'rmse': rmse})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:
-                base_train_model = self.get_keras_model(opt)
+                base_train_model = self.get_keras_model(self.opt)
             return base_train_model
         except Exception as e:
             self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
 
-    def training(self, opt, train_and_valid_data):
-        model = self.train_init(opt)
+    def training(self, train_and_valid_data):
+        model = self.train_init()
         # tf.reset_default_graph() # 清除默认图
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         print("----------", np.array(train_x[0]).shape)
         print("++++++++++", np.array(train_x[1]).shape)
         model.summary()
-        early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
-        history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,  validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
+        early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto')
+        history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'], verbose=2,  validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
         loss = np.round(history.history['loss'], decimals=5)
         val_loss = np.round(history.history['val_loss'], decimals=5)
         self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))

+ 47 - 42
models_processing/model_koi/tf_bp_pre.py

@@ -25,11 +25,21 @@ app = Flask('tf_bp_pre——service')
 
 with app.app_context():
     with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
-
-    dh = DataHandler(logger, arguments)
-    bp = BPHandler(logger)
+        args = yaml.safe_load(f)
+    dh = DataHandler(logger, args)
+    bp = BPHandler(logger, args)
+    global opt
 
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    bp.opt = opt
+    logger.info(args)
 
 @app.route('/nn_bp_predict', methods=['POST'])
 def model_prediction_bp():
@@ -38,31 +48,26 @@ def model_prediction_bp():
     result = {}
     success = 0
     print("Program starts execution!")
-    params_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(params_dict)
     try:
-        print('args', args)
-        logger.info(args)
+        # ------------ 获取数据,预处理预测数据------------
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
-        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, args, bp_data=True)
+        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, bp_data=True)
         bp.get_model(args)
-        # result = bp.predict(scaled_pre_x, args)
-        result = list(chain.from_iterable(target_scaler.inverse_transform([bp.predict(scaled_pre_x).flatten()])))
-        pre_data['power_forecast'] = result[:len(pre_data)]
+        res = list(chain.from_iterable(target_scaler.inverse_transform([bp.predict(scaled_pre_x).flatten()])))
+        pre_data['power_forecast'] = res[:len(pre_data)]
         pre_data['farm_id'] = 'J00083'
         pre_data['cdq'] = 1
         pre_data['dq'] = 1
         pre_data['zq'] = 1
-        pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+        pre_data.rename(columns={args['col_time']: 'date_time'}, inplace=True)
         pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
 
         pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
         pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
         pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
 
-        insert_data_into_mongo(pre_data, arguments)
+        insert_data_into_mongo(pre_data, args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()
@@ -84,36 +89,36 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=1010x, threads=4)
+    serve(app, host="0.0.0.0", port=1010, threads=4)
     print("server start!")
 
     # ------------------------测试代码------------------------
     args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
                  'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time', 'mongodb_write_table': 'j00083_rs',
                  'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    bp = BPHandler(logger)
-    opt = argparse.Namespace(**arguments)
-
-    opt.Model['input_size'] = len(opt.features)
-    pre_data = get_data_from_mongo(args_dict)
-    feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
-    pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt, bp_data=True)
-    bp.get_model(arguments)
-    result = bp.predict(pre_x)
-    result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
-    pre_data['power_forecast'] = result1[:len(pre_data)]
-    pre_data['farm_id'] = 'J00083'
-    pre_data['cdq'] = 1
-    pre_data['dq'] = 1
-    pre_data['zq'] = 1
-    pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
-    pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
-
-    pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
-    pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
-    pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
-
-    insert_data_into_mongo(pre_data, arguments)
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # bp = BPHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    #
+    # opt.Model['input_size'] = len(opt.features)
+    # pre_data = get_data_from_mongo(args_dict)
+    # feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
+    # pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt, bp_data=True)
+    # bp.get_model(arguments)
+    # result = bp.predict(pre_x)
+    # result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
+    # pre_data['power_forecast'] = result1[:len(pre_data)]
+    # pre_data['farm_id'] = 'J00083'
+    # pre_data['cdq'] = 1
+    # pre_data['dq'] = 1
+    # pre_data['zq'] = 1
+    # pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+    # pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
+    #
+    # pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
+    # pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
+    # pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
+    #
+    # insert_data_into_mongo(pre_data, arguments)

+ 51 - 41
models_processing/model_koi/tf_bp_train.py

@@ -24,10 +24,21 @@ app = Flask('tf_bp_train——service')
 
 with app.app_context():
     with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
-
-    dh = DataHandler(logger, arguments)
+        args = yaml.safe_load(f)
+    dh = DataHandler(logger, args)
     bp = BPHandler(logger)
+    global opt
+
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    bp.opt = opt
+    logger.info(args)
 
 @app.route('/nn_bp_training', methods=['POST'])
 def model_training_bp():
@@ -36,24 +47,23 @@ def model_training_bp():
     result = {}
     success = 0
     print("Program starts execution!")
-    args_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(args_dict)
-    try:
-        opt = argparse.Namespace(**args)
-        logger.info(args_dict)
-        train_data = get_data_from_mongo(args_dict)
-        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt, bp_data=True)
-        bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y])
-        args_dict['params'] = json.dumps(args)
-        args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-        insert_trained_model_into_mongo(bp_model, args_dict)
-        insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args)
-        success = 1
-    except Exception as e:
-        my_exception = traceback.format_exc()
-        my_exception.replace("\n", "\t")
-        result['msg'] = my_exception
+    # try:
+    # ------------ 获取数据,预处理训练数据 ------------
+    train_data = get_data_from_mongo(args)
+    train_x, train_y, valid_x, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, bp_data=True)
+    # ------------ 训练模型,保存模型 ------------
+    bp.opt.Model['input_size'] = train_x.shape[1]
+    bp_model = bp.training([train_x, train_y, valid_x, valid_y])
+    args['params'] = json.dumps(args)
+    args['descr'] = '测试'
+    args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    insert_trained_model_into_mongo(bp_model, args)
+    insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args)
+    success = 1
+    # except Exception as e:
+    #     my_exception = traceback.format_exc()
+    #     my_exception.replace("\n", "\t")
+    #     result['msg'] = my_exception
     end_time = time.time()
 
     result['success'] = success
@@ -70,23 +80,23 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=10103, threads=4)
-    print("server start!")
-    args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
-    'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
-    'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    bp = BPHandler(logger)
-    opt = argparse.Namespace(**arguments)
-    opt.Model['input_size'] = len(opt.features)
-    train_data = get_data_from_mongo(args_dict)
-    train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt, bp_data=True)
-    bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
-
-    args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-    args_dict['params'] = arguments
-    args_dict['descr'] = '测试'
-    insert_trained_model_into_mongo(bp_model, args_dict)
-    insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)
+    serve(app, host="0.0.0.0", port=10103, threads=4)
+    # print("server start!")
+    # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+    # 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
+    # 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # bp = BPHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    # opt.Model['input_size'] = len(opt.features)
+    # train_data = get_data_from_mongo(args_dict)
+    # train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt, bp_data=True)
+    # bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
+    #
+    # args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    # args_dict['params'] = arguments
+    # args_dict['descr'] = '测试'
+    # insert_trained_model_into_mongo(bp_model, args_dict)
+    # insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)

+ 12 - 10
models_processing/model_koi/tf_cnn.py

@@ -14,11 +14,13 @@ from models_processing.losses.loss_cdq import rmse
 import numpy as np
 from common.database_dml import *
 from threading import Lock
+import argparse
 model_lock = Lock()
 
 class CNNHandler(object):
-    def __init__(self, logger):
+    def __init__(self, logger, args):
         self.logger = logger
+        self.opt = argparse.Namespace(**args)
         self.model = None
 
     def get_model(self, args):
@@ -46,34 +48,34 @@ class CNNHandler(object):
 
         output = Dense(1, name='d5')(nwp)
         output_f = Flatten()(output)
-        model = Model(nwp_input, output_f)
+        model = Model(inputs=nwp_input, outputs=output_f)
         adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
         reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
         model.compile(loss=rmse, optimizer=adam)
         return model
 
-    def train_init(self, opt):
+    def train_init(self):
         try:
-            if opt.Model['add_train']:
+            if self.opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(opt), {'rmse': rmse})
+                base_train_model = get_h5_model_from_mongo(vars(self.opt), {'rmse': rmse})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:
-                base_train_model = self.get_keras_model(opt)
+                base_train_model = self.get_keras_model(self.opt)
             return base_train_model
         except Exception as e:
             self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
 
-    def training(self, opt, train_and_valid_data):
-        model = self.train_init(opt)
+    def training(self, train_and_valid_data):
+        model = self.train_init()
         # tf.reset_default_graph() # 清除默认图
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         print("----------", np.array(train_x[0]).shape)
         print("++++++++++", np.array(train_x[1]).shape)
         model.summary()
-        early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
-        history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,  validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
+        early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto')
+        history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
         loss = np.round(history.history['loss'], decimals=5)
         val_loss = np.round(history.history['val_loss'], decimals=5)
         self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))

+ 50 - 44
models_processing/model_koi/tf_cnn_pre.py

@@ -25,44 +25,50 @@ app = Flask('tf_cnn_pre——service')
 
 with app.app_context():
     with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
+        args = yaml.safe_load(f)
 
-    dh = DataHandler(logger, arguments)
-    cnn = CNNHandler(logger)
+    dh = DataHandler(logger, args)
+    cnn = CNNHandler(logger, args)
+    global opt
 
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    cnn.opt = opt
+    logger.info(args)
 
-@app.route('/nn_bp_predict', methods=['POST'])
+@app.route('/nn_cnn_predict', methods=['POST'])
 def model_prediction_bp():
     # 获取程序开始时间
     start_time = time.time()
     result = {}
     success = 0
     print("Program starts execution!")
-    params_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(params_dict)
     try:
-        print('args', args)
-        logger.info(args)
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
-        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, args)
+        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler)
         cnn.get_model(args)
         # result = bp.predict(scaled_pre_x, args)
-        result = list(chain.from_iterable(target_scaler.inverse_transform([cnn.predict(scaled_pre_x).flatten()])))
-        pre_data['power_forecast'] = result[:len(pre_data)]
+        res = list(chain.from_iterable(target_scaler.inverse_transform([cnn.predict(scaled_pre_x).flatten()])))
+        pre_data['power_forecast'] = res[:len(pre_data)]
         pre_data['farm_id'] = 'J00083'
         pre_data['cdq'] = 1
         pre_data['dq'] = 1
         pre_data['zq'] = 1
-        pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+        pre_data.rename(columns={args['col_time']: 'date_time'}, inplace=True)
         pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
 
         pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
         pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
         pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
 
-        insert_data_into_mongo(pre_data, arguments)
+        insert_data_into_mongo(pre_data, args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()
@@ -84,36 +90,36 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=1010x, threads=4)
+    serve(app, host="0.0.0.0", port=1010, threads=4)
     print("server start!")
 
     # ------------------------测试代码------------------------
-    args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
-                 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time', 'mongodb_write_table': 'j00083_rs',
-                 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    cnn = CNNHandler(logger)
-    opt = argparse.Namespace(**arguments)
-
-    opt.Model['input_size'] = len(opt.features)
-    pre_data = get_data_from_mongo(args_dict)
-    feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
-    pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt)
-    cnn.get_model(arguments)
-    result = cnn.predict(pre_x)
-    result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
-    pre_data['power_forecast'] = result1[:len(pre_data)]
-    pre_data['farm_id'] = 'J00083'
-    pre_data['cdq'] = 1
-    pre_data['dq'] = 1
-    pre_data['zq'] = 1
-    pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
-    pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
-
-    pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
-    pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
-    pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
-
-    insert_data_into_mongo(pre_data, arguments)
+    # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+    #              'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time', 'mongodb_write_table': 'j00083_rs',
+    #              'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # cnn = CNNHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    #
+    # opt.Model['input_size'] = len(opt.features)
+    # pre_data = get_data_from_mongo(args_dict)
+    # feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
+    # pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt)
+    # cnn.get_model(arguments)
+    # result = cnn.predict(pre_x)
+    # result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
+    # pre_data['power_forecast'] = result1[:len(pre_data)]
+    # pre_data['farm_id'] = 'J00083'
+    # pre_data['cdq'] = 1
+    # pre_data['dq'] = 1
+    # pre_data['zq'] = 1
+    # pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+    # pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
+    #
+    # pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
+    # pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
+    # pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
+    #
+    # insert_data_into_mongo(pre_data, arguments)

+ 52 - 38
models_processing/model_koi/tf_cnn_train.py

@@ -23,36 +23,50 @@ app = Flask('tf_cnn_train——service')
 
 with app.app_context():
     with open('../model_koi/cnn.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
+        args = yaml.safe_load(f)
 
-    dh = DataHandler(logger, arguments)
-    cnn = CNNHandler(logger)
+    dh = DataHandler(logger, args)
+    cnn = CNNHandler(logger, args)
+    global opt
 
-@app.route('/nn_bp_training', methods=['POST'])
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    cnn.opt = opt
+    logger.info(args)
+
+@app.route('/nn_cnn_training', methods=['POST'])
 def model_training_bp():
     # 获取程序开始时间
     start_time = time.time()
     result = {}
     success = 0
     print("Program starts execution!")
-    args_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(args_dict)
     try:
-        opt = argparse.Namespace(**args)
-        logger.info(args_dict)
-        train_data = get_data_from_mongo(args_dict)
-        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
-        bp_model = cnn.training(opt, [train_x, valid_x, train_y, valid_y])
-        args_dict['params'] = json.dumps(args)
-        args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-        insert_trained_model_into_mongo(bp_model, args_dict)
+        # ------------ 获取数据,预处理训练数据 ------------
+        train_data = get_data_from_mongo(args)
+        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data)
+        # ------------ 训练模型,保存模型 ------------
+        cnn.opt.Model['input_size'] = train_x.shape[2]
+        bp_model = cnn.training([train_x, valid_x, train_y, valid_y])
+
+        args['params'] = json.dumps(args)
+        args['descr'] = '测试'
+        args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+
+        insert_trained_model_into_mongo(bp_model, args)
         insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args)
         success = 1
     except Exception as e:
-        my_exception = traceback.format_exc()
-        my_exception.replace("\n", "\t")
-        result['msg'] = my_exception
+        # my_exception = traceback.format_exc()
+        # my_exception.replace("\n", "\t")
+        # result['msg'] = my_exception
+        print("???", e)
     end_time = time.time()
 
     result['success'] = success
@@ -69,23 +83,23 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=10103, threads=4)
-    print("server start!")
-    args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
-    'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
-    'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    cnn = CNNHandler(logger)
-    opt = argparse.Namespace(**arguments)
-    opt.Model['input_size'] = len(opt.features)
-    train_data = get_data_from_mongo(args_dict)
-    train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
-    cnn_model = cnn.training(opt, [train_x, train_y, valid_x, valid_y])
-
-    args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-    args_dict['params'] = arguments
-    args_dict['descr'] = '测试'
-    insert_trained_model_into_mongo(cnn_model, args_dict)
-    insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)
+    serve(app, host="0.0.0.0", port=10103, threads=4)
+    # print("server start!")
+    # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+    # 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
+    # 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # cnn = CNNHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    # opt.Model['input_size'] = len(opt.features)
+    # train_data = get_data_from_mongo(args_dict)
+    # train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
+    # cnn_model = cnn.training(opt, [train_x, train_y, valid_x, valid_y])
+    #
+    # args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    # args_dict['params'] = arguments
+    # args_dict['descr'] = '测试'
+    # insert_trained_model_into_mongo(cnn_model, args_dict)
+    # insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)

+ 25 - 37
models_processing/model_koi/tf_lstm.py

@@ -5,35 +5,22 @@
 # @Author    :David
 # @Company: shenyang JY
 
-import os.path
-from keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, BatchNormalization, Flatten, Dropout
-from keras.models import Model, load_model
-from keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard
-from keras import optimizers, regularizers
-import keras.backend as K
-from common.database_dml import *
+from tensorflow.keras.layers import Input, Dense, LSTM, concatenate, Conv1D, Conv2D, MaxPooling1D, Reshape, Flatten
+from tensorflow.keras.models import Model, load_model
+from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoard, ReduceLROnPlateau
+from tensorflow.keras import optimizers, regularizers
+from models_processing.losses.loss_cdq import rmse
 import numpy as np
-from sqlalchemy.ext.instrumentation import find_native_user_instrumentation_hook
-np.random.seed(42)
-from models_processing.losses.loss_cdq import SouthLoss, NorthEastLoss
-import tensorflow as tf
-tf.compat.v1.set_random_seed(1234)
+from common.database_dml import *
 from threading import Lock
+import argparse
 model_lock = Lock()
 
-def rmse(y_true, y_pred):
-    return K.sqrt(K.mean(K.square(y_pred - y_true)))
-
-
-var_dir = os.path.dirname(os.path.dirname(__file__))
-
 
 class TSHandler(object):
-    model = None
-    train = False
-
-    def __init__(self, logger):
+    def __init__(self, logger, args):
         self.logger = logger
+        self.opt = argparse.Namespace(**args)
         self.model = None
 
     def get_model(self, args):
@@ -49,41 +36,42 @@ class TSHandler(object):
     @staticmethod
     def get_keras_model(opt):
         # db_loss = NorthEastLoss(opt)
-        south_loss = SouthLoss(opt)
+        # south_loss = SouthLoss(opt)
         l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
         l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
-        nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp')
+        nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
 
         con1 = Conv1D(filters=64, kernel_size=5, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
-        nwp = MaxPooling1D(pool_size=5, strides=1, padding='valid', data_format='channels_last')(con1)
-        nwp_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=False, kernel_regularizer=l2_reg)(nwp)
+        con1_p = MaxPooling1D(pool_size=5, strides=1, padding='valid', data_format='channels_last')(con1)
+        nwp_lstm = LSTM(units=opt.Model['hidden_size'], return_sequences=False, kernel_regularizer=l2_reg)(nwp_input)
 
         output = Dense(opt.Model['output_size'], name='cdq_output')(nwp_lstm)
 
         model = Model(nwp_input, output)
         adam = optimizers.Adam(learning_rate=0.001, beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
-        model.compile(loss=south_loss, optimizer=adam)
+        model.compile(loss=rmse, optimizer=adam)
         return model
 
-    def train_init(self, opt):
+    def train_init(self):
         try:
-            if opt.Model['add_train']:
+            if self.opt.Model['add_train']:
                 # 进行加强训练,支持修模
-                base_train_model = get_h5_model_from_mongo(vars(opt), {'rmse': rmse})
+                base_train_model = get_h5_model_from_mongo(vars(self.opt), {'rmse': rmse})
                 base_train_model.summary()
                 self.logger.info("已加载加强训练基础模型")
             else:
-                base_train_model = self.get_keras_model(opt)
+                base_train_model = self.get_keras_model(self.opt)
             return base_train_model
         except Exception as e:
             self.logger.info("加强训练加载模型权重失败:{}".format(e.args))
 
-    def training(self, opt, train_and_valid_data):
-        model = self.train_init(opt)
+    def training(self, train_and_valid_data):
+        model = self.train_init()
         model.summary()
         train_x, train_y, valid_x, valid_y = train_and_valid_data
-        early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
-        history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop])
+        early_stop = EarlyStopping(monitor='val_loss', patience=self.opt.Model['patience'], mode='auto')
+        history = model.fit(train_x, train_y, batch_size=self.opt.Model['batch_size'], epochs=self.opt.Model['epoch'],
+                            verbose=2, validation_data=(valid_x, valid_y), callbacks=[early_stop], shuffle=False)
         loss = np.round(history.history['loss'], decimals=5)
         val_loss = np.round(history.history['val_loss'], decimals=5)
         self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
@@ -91,8 +79,8 @@ class TSHandler(object):
         self.logger.info("验证集损失函数为:{}".format(val_loss))
         return model
 
-    def predict(self, test_X, batch_size=1):
-        result = TSHandler.model.predict(test_X, batch_size=batch_size)
+    def predict(self, test_x, batch_size=1):
+        result = self.model.predict(test_x, batch_size=batch_size)
         self.logger.info("执行预测方法")
         return result
 

+ 19 - 13
models_processing/model_koi/tf_lstm_pre.py

@@ -25,44 +25,50 @@ app = Flask('tf_lstm_pre——service')
 
 with app.app_context():
     with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
+        args = yaml.safe_load(f)
 
-    dh = DataHandler(logger, arguments)
-    ts = TSHandler(logger)
+    dh = DataHandler(logger, args)
+    ts = TSHandler(logger, args)
+    global opt
 
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    ts.opt = opt
+    logger.info(args)
 
-@app.route('/nn_bp_predict', methods=['POST'])
+@app.route('/nn_lstm_predict', methods=['POST'])
 def model_prediction_bp():
     # 获取程序开始时间
     start_time = time.time()
     result = {}
     success = 0
     print("Program starts execution!")
-    params_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(params_dict)
     try:
-        print('args', args)
-        logger.info(args)
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
         scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, args)
         ts.get_model(args)
         # result = bp.predict(scaled_pre_x, args)
-        result = list(chain.from_iterable(target_scaler.inverse_transform([ts.predict(scaled_pre_x).flatten()])))
-        pre_data['power_forecast'] = result[:len(pre_data)]
+        res = list(chain.from_iterable(target_scaler.inverse_transform([ts.predict(scaled_pre_x).flatten()])))
+        pre_data['power_forecast'] = res[:len(pre_data)]
         pre_data['farm_id'] = 'J00083'
         pre_data['cdq'] = 1
         pre_data['dq'] = 1
         pre_data['zq'] = 1
-        pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+        pre_data.rename(columns={args['col_time']: 'date_time'}, inplace=True)
         pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
 
         pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
         pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
         pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
 
-        insert_data_into_mongo(pre_data, arguments)
+        insert_data_into_mongo(pre_data, args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()

+ 50 - 34
models_processing/model_koi/tf_lstm_train.py

@@ -12,6 +12,8 @@ import logging, argparse
 from data_processing.data_operation.data_handler import DataHandler
 import time, yaml
 from models_processing.model_koi.tf_lstm import TSHandler
+from models_processing.model_koi.tf_cnn import CNNHandler
+
 from common.database_dml import *
 import matplotlib.pyplot as plt
 from common.logs import Log
@@ -23,30 +25,44 @@ app = Flask('tf_lstm_train——service')
 
 with app.app_context():
     with open('../model_koi/lstm.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
+        args = yaml.safe_load(f)
+
+    dh = DataHandler(logger, args)
+    ts = TSHandler(logger, args)
+    # ts = CNNHandler(logger, args)
+    global opt
 
-    dh = DataHandler(logger, arguments)
-    ts = TSHandler(logger)
+@app.before_request
+def update_config():
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    dh.opt = opt
+    ts.opt = opt
+    logger.info(args)
 
-@app.route('/nn_bp_training', methods=['POST'])
+@app.route('/nn_lstm_training', methods=['POST'])
 def model_training_bp():
     # 获取程序开始时间
     start_time = time.time()
     result = {}
     success = 0
     print("Program starts execution!")
-    args_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(args_dict)
     try:
-        opt = argparse.Namespace(**args)
-        logger.info(args_dict)
-        train_data = get_data_from_mongo(args_dict)
-        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
-        bp_model = ts.training(opt, [train_x, valid_x, train_y, valid_y])
-        args_dict['params'] = json.dumps(args)
-        args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-        insert_trained_model_into_mongo(bp_model, args_dict)
+        # ------------ 获取数据,预处理训练数据 ------------
+        train_data = get_data_from_mongo(args)
+        train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data)
+        # ------------ 训练模型,保存模型 ------------
+        ts.opt.Model['input_size'] = train_x.shape[2]
+        ts_model = ts.training([train_x, valid_x, train_y, valid_y])
+
+        args['params'] = json.dumps(args)
+        args['descr'] = '测试'
+        args['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+
+        insert_trained_model_into_mongo(ts_model, args)
         insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args)
         success = 1
     except Exception as e:
@@ -69,23 +85,23 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=10103, threads=4)
+    serve(app, host="0.0.0.0", port=10103, threads=4)
     print("server start!")
-    args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
-    'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
-    'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    ts = TSHandler(logger)
-    opt = argparse.Namespace(**arguments)
-    opt.Model['input_size'] = len(opt.features)
-    train_data = get_data_from_mongo(args_dict)
-    train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
-    ts_model = ts.training(opt, [train_x, train_y, valid_x, valid_y])
-
-    args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
-    args_dict['params'] = arguments
-    args_dict['descr'] = '测试'
-    insert_trained_model_into_mongo(ts_model, args_dict)
-    insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)
+    # args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
+    # 'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
+    # 'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # ts = TSHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    # opt.Model['input_size'] = len(opt.features)
+    # train_data = get_data_from_mongo(args_dict)
+    # train_x, valid_x, train_y, valid_y, scaled_train_bytes, scaled_target_bytes = dh.train_data_handler(train_data, opt)
+    # ts_model = ts.training(opt, [train_x, train_y, valid_x, valid_y])
+    #
+    # args_dict['gen_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
+    # args_dict['params'] = arguments
+    # args_dict['descr'] = '测试'
+    # insert_trained_model_into_mongo(ts_model, args_dict)
+    # insert_scaler_model_into_mongo(scaled_train_bytes, scaled_target_bytes, args_dict)