David 2 meses atrás
pai
commit
e82cc16a59

+ 4 - 4
common/data_cleaning.py

@@ -7,12 +7,12 @@
 import numpy as np
 np.random.seed(42)
 
-def cleaning(df, name, logger, cols=None, dup=True):
+def cleaning(df, name, logger, cols=None, dup=True, col_time='dateTime'):
     logger.info("开始清洗:{}……".format(name))
     data = df.copy()
     data = data_column_cleaning(data, logger)
     if dup:
-        data = rm_duplicated(data, logger)
+        data = rm_duplicated(data, logger, col_time)
     if cols is not None:
         data = key_field_row_cleaning(data, cols, logger)
     return data
@@ -65,7 +65,7 @@ def key_field_row_cleaning(data, cols, logger):
         logger.info("行清洗:清洗的行数有:{},缺失的列有:{}".format(rows_pre-rows_late, ', '.join(nan_cols)))
     return data
 
-def rm_duplicated(data, logger):
+def rm_duplicated(data, logger, col_time='dateTime'):
     """
     按照时间去重
     :param data:
@@ -74,7 +74,7 @@ def rm_duplicated(data, logger):
     """
     # 按照时间去重
     rows_pre = len(data)
-    data = data.drop_duplicates(subset='dateTime')
+    data = data.drop_duplicates(subset=col_time)
     rows_late = len(data)
     if rows_pre - rows_late > 0:
         logger.info("时间去重的行数有:{}".format(rows_pre - rows_late))

+ 13 - 11
data_processing/data_operation/data_handler.py

@@ -172,18 +172,19 @@ class DataHandler(object):
             y_valid
         """
         col_time, features, target = opt.col_time, opt.features, opt.target
-        # 清洗处理好的限电记录
+        # 清洗限电记录
         if 'is_limit' in data.columns:
             data = data[data['is_limit'] == False]
-        # 筛选特征,数值化
+        # 筛选特征,数值化,排序
         train_data = data[[col_time] + features + [target]]
+        train_data = train_data.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        train_data = train_data.sort_values(by=col_time)
         # 清洗特征平均缺失率大于20%的天
         # train_data = missing_features(train_data, features, col_time)
-        train_data = train_data.sort_values(by=col_time)
-        # train_data = train_data.sort_values(by=col_time).fillna(method='ffill').fillna(method='bfill')
-        # 对清洗完限电的数据进行特征预处理:1.空值异常值清洗 2.缺值补值
-        train_data_cleaned = key_field_row_cleaning(train_data, features + [target], self.logger)
-        train_data_cleaned = train_data_cleaned.applymap(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x, numbers.Number) else x)
+        # 对清洗完限电的数据进行特征预处理:
+        # 1.空值异常值清洗
+        train_data_cleaned = cleaning(train_data, '训练集', self.logger, features + [target], col_time)
+        # 2. 标准化
         # 创建特征和目标的标准化器
         train_scaler = MinMaxScaler(feature_range=(0, 1))
         target_scaler = MinMaxScaler(feature_range=(0, 1))
@@ -192,12 +193,11 @@ class DataHandler(object):
         scaled_target = target_scaler.fit_transform(train_data_cleaned[[target]])
         train_data_cleaned[features] = scaled_train_data
         train_data_cleaned[[target]] = scaled_target
-
+        # 3.缺值补值
         train_datas = self.fill_train_data(train_data_cleaned, col_time)
         # 保存两个scaler
         scaled_train_bytes = BytesIO()
         scaled_target_bytes = BytesIO()
-
         joblib.dump(train_scaler, scaled_train_bytes)
         joblib.dump(target_scaler, scaled_target_bytes)
         scaled_train_bytes.seek(0)  # Reset pointer to the beginning of the byte stream
@@ -220,6 +220,7 @@ class DataHandler(object):
         return:
             scaled_features: 反归一化的特征
         """
+        # 清洗限电记录
         if 'is_limit' in data.columns:
             data = data[data['is_limit'] == False]
         # features, time_steps, col_time, model_name, col_reserve = str_to_list(args['features']), int(
@@ -229,7 +230,8 @@ class DataHandler(object):
         scaled_features = feature_scaler.transform(pre_data[features])
         pre_data[features] = scaled_features
         if bp_data:
-            pre_x = self.get_predict_data([pre_data], features)
+            pre_x = np.array(pre_data)
         else:
-            pre_x = pre_data.values
+            pre_x = self.get_predict_data([pre_data], features)
+            pre_x = np.array(pre_data)
         return pre_x

+ 39 - 38
models_processing/model_koi/tf_bp_pre.py

@@ -25,9 +25,8 @@ app = Flask('tf_bp_pre——service')
 
 with app.app_context():
     with open('../model_koi/bp.yaml', 'r', encoding='utf-8') as f:
-        arguments = yaml.safe_load(f)
-
-    dh = DataHandler(logger, arguments)
+        args = yaml.safe_load(f)
+    dh = DataHandler(logger, args)
     bp = BPHandler(logger)
 
 
@@ -38,15 +37,17 @@ def model_prediction_bp():
     result = {}
     success = 0
     print("Program starts execution!")
-    params_dict = request.values.to_dict()
-    args = arguments.deepcopy()
-    args.update(params_dict)
+    # ------------ 整理参数,整合请求参数 ------------
+    args_dict = request.values.to_dict()
+    args_dict['features'] = args_dict['features'].split(',')
+    args.update(args_dict)
+    opt = argparse.Namespace(**args)
+    logger.info(args)
     try:
-        print('args', args)
-        logger.info(args)
+        # ------------ 获取数据,预处理预测数据------------
         pre_data = get_data_from_mongo(args)
         feature_scaler, target_scaler = get_scaler_model_from_mongo(args)
-        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, args, bp_data=True)
+        scaled_pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt, bp_data=True)
         bp.get_model(args)
         # result = bp.predict(scaled_pre_x, args)
         result = list(chain.from_iterable(target_scaler.inverse_transform([bp.predict(scaled_pre_x).flatten()])))
@@ -55,14 +56,14 @@ def model_prediction_bp():
         pre_data['cdq'] = 1
         pre_data['dq'] = 1
         pre_data['zq'] = 1
-        pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+        pre_data.rename(columns={args['col_time']: 'date_time'}, inplace=True)
         pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
 
         pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
         pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
         pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
 
-        insert_data_into_mongo(pre_data, arguments)
+        insert_data_into_mongo(pre_data, args)
         success = 1
     except Exception as e:
         my_exception = traceback.format_exc()
@@ -84,36 +85,36 @@ if __name__ == "__main__":
     logger = logging.getLogger("model_training_bp log")
     from waitress import serve
 
-    # serve(app, host="0.0.0.0", port=1010x, threads=4)
+    serve(app, host="0.0.0.0", port=1010, threads=4)
     print("server start!")
 
     # ------------------------测试代码------------------------
     args_dict = {"mongodb_database": 'david_test', 'scaler_table': 'j00083_scaler', 'model_name': 'bp1.0.test',
                  'model_table': 'j00083_model', 'mongodb_read_table': 'j00083_test', 'col_time': 'date_time', 'mongodb_write_table': 'j00083_rs',
                  'features': 'speed10,direction10,speed30,direction30,speed50,direction50,speed70,direction70,speed90,direction90,speed110,direction110,speed150,direction150,speed170,direction170'}
-    args_dict['features'] = args_dict['features'].split(',')
-    arguments.update(args_dict)
-    dh = DataHandler(logger, arguments)
-    bp = BPHandler(logger)
-    opt = argparse.Namespace(**arguments)
-
-    opt.Model['input_size'] = len(opt.features)
-    pre_data = get_data_from_mongo(args_dict)
-    feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
-    pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt, bp_data=True)
-    bp.get_model(arguments)
-    result = bp.predict(pre_x)
-    result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
-    pre_data['power_forecast'] = result1[:len(pre_data)]
-    pre_data['farm_id'] = 'J00083'
-    pre_data['cdq'] = 1
-    pre_data['dq'] = 1
-    pre_data['zq'] = 1
-    pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
-    pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
-
-    pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
-    pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
-    pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
-
-    insert_data_into_mongo(pre_data, arguments)
+    # args_dict['features'] = args_dict['features'].split(',')
+    # arguments.update(args_dict)
+    # dh = DataHandler(logger, arguments)
+    # bp = BPHandler(logger)
+    # opt = argparse.Namespace(**arguments)
+    #
+    # opt.Model['input_size'] = len(opt.features)
+    # pre_data = get_data_from_mongo(args_dict)
+    # feature_scaler, target_scaler = get_scaler_model_from_mongo(arguments)
+    # pre_x = dh.pre_data_handler(pre_data, feature_scaler, opt, bp_data=True)
+    # bp.get_model(arguments)
+    # result = bp.predict(pre_x)
+    # result1 = list(chain.from_iterable(target_scaler.inverse_transform([result.flatten()])))
+    # pre_data['power_forecast'] = result1[:len(pre_data)]
+    # pre_data['farm_id'] = 'J00083'
+    # pre_data['cdq'] = 1
+    # pre_data['dq'] = 1
+    # pre_data['zq'] = 1
+    # pre_data.rename(columns={arguments['col_time']: 'date_time'}, inplace=True)
+    # pre_data = pre_data[['date_time', 'power_forecast', 'farm_id', 'cdq', 'dq', 'zq']]
+    #
+    # pre_data['power_forecast'] = pre_data['power_forecast'].round(2)
+    # pre_data.loc[pre_data['power_forecast'] > opt.cap, 'power_forecast'] = opt.cap
+    # pre_data.loc[pre_data['power_forecast'] < 0, 'power_forecast'] = 0
+    #
+    # insert_data_into_mongo(pre_data, arguments)