David 4 月之前
父節點
當前提交
3c0f54f1dc

+ 1 - 1
common/data_cleaning.py

@@ -74,7 +74,7 @@ def rm_duplicated(data, logger):
     """
     # 按照时间去重
     rows_pre = len(data)
-    data = data.drop_duplicates(subset='C_TIME')
+    data = data.drop_duplicates(subset='dateTime')
     rows_late = len(data)
     if rows_pre - rows_late > 0:
         logger.info("时间去重的行数有:{}".format(rows_pre - rows_late))

+ 32 - 27
data_processing/data_operation/data_handler.py

@@ -6,6 +6,8 @@
 # @Company: shenyang JY
 import argparse
 import pandas as pd
+from pyexpat import features
+
 from common.data_cleaning import *
 
 class DataHandler(object):
@@ -13,58 +15,61 @@ class DataHandler(object):
         self.logger = logger
         self.opt = argparse.Namespace(**args)
 
-    def get_train_data(self, df):
+    def get_train_data(self, dfs, col_time, features, target):
         train_x, valid_x, train_y, valid_y = [], [], [], []
-        if len(df) < self.opt.Model["time_step"]:
-            self.logger.info("特征处理-训练数据-不满足time_step")
-        datax, datay = self.get_timestep_features(df, is_train=True)
-        if len(datax) < 10:
-            self.logger.info("特征处理-训练数据-无法进行最小分割")
-        tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
-        train_x.extend(tx)
-        valid_x.extend(vx)
-        train_y.extend(ty)
-        valid_y.extend(vy)
+        for i, df in enumerate(dfs, start=1):
+            if len(df) < self.opt.Model["time_step"]:
+                self.logger.info("特征处理-训练数据-不满足time_step")
+            datax, datay = self.get_timestep_features(df, col_time, features, target, is_train=True)
+            if len(datax) < 10:
+                self.logger.info("特征处理-训练数据-无法进行最小分割")
+                continue
+            tx, vx, ty, vy = self.train_valid_split(datax, datay, valid_rate=self.opt.Model["valid_data_rate"], shuffle=self.opt.Model['shuffle_train_data'])
+            train_x.extend(tx)
+            valid_x.extend(vx)
+            train_y.extend(ty)
+            valid_y.extend(vy)
 
         train_y = np.concatenate([[y.iloc[:, 1].values for y in train_y]], axis=0)
         valid_y = np.concatenate([[y.iloc[:, 1].values for y in valid_y]], axis=0)
 
-        train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
-        valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
+        train_x = np.array([x.values for x in train_x])
+        valid_x = np.array([x.values for x in valid_x])
+        # train_x = [np.array([x[0].values for x in train_x]), np.array([x[1].values for x in train_x])]
+        # valid_x = [np.array([x[0].values for x in valid_x]), np.array([x[1].values for x in valid_x])]
 
         return train_x, valid_x, train_y, valid_y
 
-    def get_timestep_features(self, norm_data, is_train):   # 这段代码基于pandas方法的优化
+    def get_timestep_features(self, norm_data, col_time, features, target, is_train):   # 这段代码基于pandas方法的优化
         time_step = self.opt.Model["time_step"]
         feature_data = norm_data.reset_index(drop=True)
         time_step_loc = time_step - 1
         train_num = int(len(feature_data))
-        label_features = ['C_TIME', 'C_REAL_VALUE'] if is_train is True else ['C_TIME', 'C_REAL_VALUE']
-        nwp_cs = self.opt.features
+        label_features = [col_time, target] if is_train is True else [col_time, target]
+        nwp_cs = features
         nwp = [feature_data.loc[i:i + time_step_loc, nwp_cs].reset_index(drop=True) for i in range(train_num - time_step + 1)]  # 数据库字段 'C_T': 'C_WS170'
         labels = [feature_data.loc[i:i + time_step_loc, label_features].reset_index(drop=True) for i in range(train_num - time_step + 1)]
         features_x, features_y = [], []
-        self.logger.info("匹配环境前,{}组 -> ".format(len(nwp)))
         for i, row in enumerate(zip(nwp, labels)):
             features_x.append(row[0])
             features_y.append(row[1])
-        self.logger.info("匹配环境后,{}组".format(len(features_x)))
         return features_x, features_y
 
-    def fill_train_data(self, unite):
-        unite['C_TIME'] = pd.to_datetime(unite['C_TIME'])
-        unite['time_diff'] = unite['C_TIME'].diff()
+    def fill_train_data(self, unite, col_time):
+        unite[col_time] = pd.to_datetime(unite[col_time])
+        unite['time_diff'] = unite[col_time].diff()
         dt_short = pd.Timedelta(minutes=15)
         dt_long = pd.Timedelta(minutes=15 * self.opt.Model['how_long_fill'])
-        data_train = self.missing_time_splite(unite, dt_short, dt_long)
+        data_train = self.missing_time_splite(unite, dt_short, dt_long, col_time)
         miss_points = unite[(unite['time_diff'] > dt_short) & (unite['time_diff'] < dt_long)]
         miss_number = miss_points['time_diff'].dt.total_seconds().sum(axis=0) / (15 * 60) - len(miss_points)
         self.logger.info("再次测算,需要插值的总点数为:{}".format(miss_number))
         if miss_number > 0 and self.opt.Model["train_data_fill"]:
-            data_train = self.data_fill(data_train)
+            data_train = self.data_fill(data_train, col_time)
         return data_train
 
-    def missing_time_splite(self, df, dt_short, dt_long):
+    def missing_time_splite(self, df, dt_short, dt_long, col_time):
+        df.reset_index(drop=True, inplace=True)
         n_long, n_short, n_points = 0, 0, 0
         start_index = 0
         dfs = []
@@ -75,7 +80,7 @@ class DataHandler(object):
                 start_index = i
                 n_long += 1
             if df['time_diff'][i] > dt_short:
-                self.logger.info(f"{df['C_TIME'][i-1]} ~ {df['C_TIME'][i]}")
+                self.logger.info(f"{df[col_time][i-1]} ~ {df[col_time][i]}")
                 points = df['time_diff'].dt.total_seconds()[i]/(60*15)-1
                 self.logger.info("缺失点数:{}".format(points))
                 if df['time_diff'][i] < dt_long:
@@ -87,11 +92,11 @@ class DataHandler(object):
         self.logger.info("需要补值的总点数:{}".format(n_points))
         return dfs
 
-    def data_fill(self, dfs, test=False):
+    def data_fill(self, dfs, col_time, test=False):
         dfs_fill, inserts = [], 0
         for i, df in enumerate(dfs):
             df = rm_duplicated(df, self.logger)
-            df1 = df.set_index('C_TIME', inplace=False)
+            df1 = df.set_index(col_time, inplace=False)
             dff = df1.resample('15T').interpolate(method='linear')  # 采用线性补值,其他补值方法需要进一步对比
             dff.reset_index(inplace=True)
             points = len(dff) - len(df1)

+ 3 - 0
models_processing/losses/loss_cdq.py

@@ -9,6 +9,9 @@ import tensorflow as tf
 tf.compat.v1.set_random_seed(1234)
 
 
+def rmse(y_true, y_pred):
+    return K.sqrt(K.mean(K.square(y_pred - y_true)))
+
 class SouthLoss(tf.keras.losses.Loss):
     def __init__(self, opt, name='south_loss'):
         """

+ 45 - 29
models_processing/model_koi/nn_bp.py

@@ -19,20 +19,21 @@ from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping, TensorBoa
 from tensorflow.keras import optimizers, regularizers
 import tensorflow.keras.backend as K
 import tensorflow as tf
-from common.data_cleaning import cleaning
+from bson.decimal128 import Decimal128
+from common.data_cleaning import cleaning, key_field_row_cleaning
 from common.database_dml import *
 from common.processing_data_common import missing_features, str_to_list
 from data_processing.data_operation.data_handler import DataHandler
 from threading import Lock
 import time, yaml
-import random
+import random, numbers
 import matplotlib.pyplot as plt
 model_lock = Lock()
 from common.logs import Log
 logger = logging.getLogger()
 # logger = Log('models-processing').logger
 np.random.seed(42)  # NumPy随机种子
-tf.random.set_random_seed(42)  # TensorFlow随机种子
+# tf.set_random_seed(42)  # TensorFlow随机种子
 app = Flask('nn_bp——service')
 
 with app.app_context():
@@ -41,26 +42,42 @@ with app.app_context():
 
 dh = DataHandler(logger, arguments)
 def train_data_handler(data, opt):
+    """
+    训练数据预处理:
+    清洗+补值+归一化
+    Aras:
+        data: 从mongo中加载的数据
+        opt:参数命名空间
+    return:
+        x_train
+        x_valid
+        y_train
+        y_valid
+    """
     col_time, features, target = opt.col_time, opt.features, opt.target
+    # 清洗处理好的限电记录
     if 'is_limit' in data.columns:
         data = data[data['is_limit'] == False]
+    # 筛选特征,数值化
+    train_data = data[[col_time]+features+[target]]
     # 清洗特征平均缺失率大于20%的天
-    data = missing_features(data, features, col_time)
-    train_data = data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
-
+    train_data = missing_features(train_data, features, col_time)
     train_data = train_data.sort_values(by=col_time)
+    # train_data = train_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
     # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
-    train_data_cleaned = cleaning(train_data, 'nn_bp:features', logger, features)
-    train_data = dh.fill_train_data(train_data_cleaned)
+    train_data_cleaned = key_field_row_cleaning(train_data, features+[target], logger)
+    train_data_cleaned = train_data_cleaned.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
     # 创建特征和目标的标准化器
     train_scaler = MinMaxScaler(feature_range=(0, 1))
     # 标准化特征和目标
-    scaled_train_data = train_scaler.fit_transform(train_data[features+[target]])
+    scaled_train_data = train_scaler.fit_transform(train_data_cleaned[features+[target]])
+    train_data_cleaned[features+[target]] = scaled_train_data
+    train_datas = dh.fill_train_data(train_data_cleaned, col_time)
     # 保存两个scaler
     scaled_train_bytes = BytesIO()
     joblib.dump(scaled_train_data, scaled_train_bytes)
     scaled_train_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
-    x_train, x_valid, y_train, y_valid = dh.get_train_data(scaled_train_data)
+    x_train, x_valid, y_train, y_valid = dh.get_train_data(train_datas, col_time, features, target)
     return x_train, x_valid, y_train, y_valid, scaled_train_bytes
 
 def pre_data_handler(data, args):
@@ -92,22 +109,20 @@ class BPHandler(object):
     def get_keras_model(opt):
         # db_loss = NorthEastLoss(opt)
         # south_loss = SouthLoss(opt)
+        from models_processing.losses.loss_cdq import rmse
         l1_reg = regularizers.l1(opt.Model['lambda_value_1'])
         l2_reg = regularizers.l2(opt.Model['lambda_value_2'])
-        nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size_nwp']), name='nwp')
-        env_input = Input(shape=(opt.Model['his_points'], opt.Model['input_size_env']), name='env')
+        nwp_input = Input(shape=(opt.Model['time_step'], opt.Model['input_size']), name='nwp')
 
-        con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu',
-                      kernel_regularizer=l2_reg)(nwp_input)
+        con1 = Conv1D(filters=64, kernel_size=1, strides=1, padding='valid', activation='relu', kernel_regularizer=l2_reg)(nwp_input)
         d1 = Dense(32, activation='relu', name='d1', kernel_regularizer=l1_reg)(con1)
         nwp = Dense(8, activation='relu', name='d2', kernel_regularizer=l1_reg)(d1)
 
         output = Dense(opt.Model['output_size'], name='d5')(nwp)
-        model = Model([env_input, nwp_input], output)
-        adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7,
-                               amsgrad=True)
+        model = Model(nwp_input, output)
+        adam = optimizers.Adam(learning_rate=opt.Model['learning_rate'], beta_1=0.9, beta_2=0.999, epsilon=1e-7, amsgrad=True)
         reduce_lr = ReduceLROnPlateau(monitor='val_loss', factor=0.01, patience=5, verbose=1)
-        model.compile(loss='rmse', optimizer=adam)
+        model.compile(loss=rmse, optimizer=adam)
         return model
 
     def train_init(self, opt):
@@ -125,16 +140,14 @@ class BPHandler(object):
 
     def training(self, opt, train_and_valid_data):
         model = self.train_init(opt)
-        tf.reset_default_graph() # 清除默认图
+        # tf.reset_default_graph() # 清除默认图
         train_x, train_y, valid_x, valid_y = train_and_valid_data
         print("----------", np.array(train_x[0]).shape)
         print("++++++++++", np.array(train_x[1]).shape)
 
-        check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss',
-                                      save_best_only=True, mode='auto')
+        check_point = ModelCheckpoint(filepath='./var/' + 'fmi.h5', monitor='val_loss',  save_best_only=True, mode='auto')
         early_stop = EarlyStopping(monitor='val_loss', patience=opt.Model['patience'], mode='auto')
-        history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,
-                            validation_data=(valid_x, valid_y), callbacks=[check_point, early_stop], shuffle=False)
+        history = model.fit(train_x, train_y, batch_size=opt.Model['batch_size'], epochs=opt.Model['epoch'], verbose=2,  validation_data=(valid_x, valid_y), callbacks=[check_point, early_stop], shuffle=False)
         loss = np.round(history.history['loss'], decimals=5)
         val_loss = np.round(history.history['val_loss'], decimals=5)
         self.logger.info("-----模型训练经过{}轮迭代-----".format(len(loss)))
@@ -225,13 +238,16 @@ if __name__ == "__main__":
     print("server start!")
 
     bp = BPHandler(logger)
-    args = copy.deepcopy(bp)
-    opt = argparse.Namespace(**arguments)
-    logger.info(args)
     args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
-            'model_table': 'j00083_model', 'mongodb_read_table': 'j00083'}
+    'model_table': 'j00083_model', 'mongodb_read_table': 'j00083', 'col_time': 'dateTime',
+    'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
+    args_dict['features'] = args_dict['features'].split(',')
+    arguments.update(args_dict)
+    opt = argparse.Namespace(**arguments)
+    opt.Model['input_size'] = len(opt.features)
     train_data = get_data_from_mongo(args_dict)
     train_x, valid_x, train_y, valid_y, scaled_train_bytes = train_data_handler(train_data, opt)
-    bp_model = bp.training(opt, [train_x, valid_x, train_y, valid_y])
+
+    bp_model = bp.training(opt, [train_x, train_y, valid_x, valid_y])
     insert_trained_model_into_mongo(bp_model, args_dict)
-    insert_scaler_model_into_mongo(scaled_train_bytes, args)
+    insert_scaler_model_into_mongo(scaled_train_bytes, args_dict)