Parcourir la source

awg commit algorithm components

anweiguo il y a 4 mois
Parent
commit
9db13046db

+ 3 - 3
data_processing/data_operation/data_join.py

@@ -45,9 +45,9 @@ def insert_data_into_mongo(res_df,args):
 
 
 #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法  数据库数据有问题 暂时用不了
-def  data_join(df_list, args):
+def  data_merge(df_list, args):
     join_key,join_type = args['join_key'], args['join_type']
-    result = reduce(lambda left, right: pd.merge(left, right, how='join_type', on=join_key), df_list)
+    result = reduce(lambda left, right: pd.merge(left, right, how=join_type, on=join_key), df_list)
     return result
 
 
@@ -63,7 +63,7 @@ def data_join():
         print('args',args)
         logger.info(args)
         df_list = get_data_from_mongo(args)
-        res_df = data_join(df_list,args)
+        res_df = data_merge(df_list,args)
         insert_data_into_mongo(res_df,args)
         success = 1
     except Exception as e:

+ 3 - 3
data_processing/processing_limit_power/processing_limit_power_by_statistics_light.py

@@ -7,7 +7,7 @@ import traceback
 from sklearn.linear_model import LinearRegression
 import numpy as np
 from bson.decimal128 import Decimal128
-
+import numbers
 app = Flask('processing_limit_power_by_statistics_light——service')
 
 
@@ -53,8 +53,8 @@ def light_statistics_judgement(df_power,args):
     col_radiance, col_power, sigma=args['col_radiance'],args['col_power'],float(args['sigma'])
     origin_records = df_power.shape[0]
     # 提取辐射度和实际功率
-    df_power[col_radiance] = df_power[col_radiance].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else np.nan)
-    df_power[col_power] = df_power[col_power].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else np.nan)
+    df_power[col_radiance] = df_power[col_radiance].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128)  else  float(x) if isinstance(x, numbers.Number) else np.nan)
+    df_power[col_power] = df_power[col_power].apply(lambda x: float(x.to_decimal()) if isinstance(x, Decimal128)  else  float(x) if isinstance(x, numbers.Number) else np.nan)
     df_power = df_power[(~pd.isna(df_power[col_radiance]))&(~pd.isna(df_power[col_power]))&(~((df_power[col_radiance]<=0)&(df_power[col_power]>0)))] 
     
     X = df_power[[col_radiance]].values

+ 10 - 1
data_processing/processing_limit_power/processing_limit_power_by_statistics_wind.py

@@ -7,8 +7,10 @@ import traceback
 import numpy as np
 from sklearn.preprocessing import StandardScaler
 from sklearn.cluster import DBSCAN
+import numbers
+from bson.decimal128 import Decimal128
 
-app = Flask('processing_limit_power_by_statistics_light——service')
+app = Flask('processing_limit_power_by_statistics_wind——service')
 
 
 @app.route('/hello', methods=['POST'])
@@ -53,6 +55,13 @@ def wind_statistics_judgement(df_power,args):
     col_ws, col_power, eps, min_samples, ws_in, ws_out, ws_rated, cap = (args['col_ws'], args['col_power'], args['eps'],
                                     args['min_samples'], args['ws_in'], args['ws_out'], args['ws_rated'], args['cap'])
     print("min_sample", min_samples)
+
+    df_power[col_ws] = df_power[col_ws].apply(
+        lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x,
+                                                                                                 numbers.Number) else np.nan)
+    df_power[col_power] = df_power[col_power].apply(
+        lambda x: float(x.to_decimal()) if isinstance(x, Decimal128) else float(x) if isinstance(x,
+                                                                                                 numbers.Number) else np.nan)
     df_tmp = df_power[(~np.isnan(df_power[col_ws])) & (~np.isnan(df_power[col_power]))]
     # 标准化数据
     data = df_tmp[[col_ws, col_power]].values

+ 10 - 2
models_processing/model_predict/model_prediction_lstm.py

@@ -87,8 +87,9 @@ def create_sequences(data_features,data_target,time_steps):
         return np.array(X), np.array(y)
 
 def model_prediction(df,args):
-    mongodb_connection, mongodb_database, scaler_table, features, time_steps = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
-                                        args['mongodb_database'], args['scaler_table'],args['features'],args['time_steps'])
+
+    mongodb_connection, mongodb_database, scaler_table, features, time_steps, col_time = ("mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",
+                                        args['mongodb_database'], args['scaler_table'], str_to_list(args['features']),int(args['time_steps']),args['col_time'])
     client = MongoClient(mongodb_connection)
     # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
     db = client[mongodb_database]
@@ -96,10 +97,12 @@ def model_prediction(df,args):
     # Retrieve the scalers from MongoDB
     scaler_doc = collection.find_one()
     # Deserialize the scalers
+
     feature_scaler_bytes = BytesIO(scaler_doc["feature_scaler"])
     feature_scaler = joblib.load(feature_scaler_bytes)
     target_scaler_bytes = BytesIO(scaler_doc["target_scaler"])
     target_scaler = joblib.load(target_scaler_bytes)
+    df = df.fillna(method='ffill').fillna(method='bfill').sort_values(by=col_time)
     scaled_features = feature_scaler.transform(df[features])
     X_predict, _ = create_sequences(scaled_features, [], time_steps)
     # 加载模型时传入自定义损失函数
@@ -110,6 +113,11 @@ def model_prediction(df,args):
     result['predict'] = y_predict
     return result
 
+def str_to_list(arg):
+    if arg == '':
+        return []
+    else:
+        return arg.split(',')
 
 @app.route('/model_prediction_lstm', methods=['POST'])
 def model_prediction_lstm():

+ 2 - 2
models_processing/model_train/model_training_lightgbm.py

@@ -59,10 +59,10 @@ def build_model(df,args):
         'verbose':1
     }
 
-    merged_param = params | model_params
+    params.update(model_params)
     # 训练模型
     print('Starting training...')
-    gbm = lgb.train(merged_param,
+    gbm = lgb.train(params,
                     lgb_train,
                     num_boost_round=num_boost_round,
                     valid_sets=[lgb_train, lgb_eval],

+ 4 - 4
models_processing/model_train/model_training_lstm.py

@@ -94,15 +94,15 @@ def create_sequences(data_features,data_target,time_steps):
 
 
 def build_model(data, args):
-    begin_time, end_time, col_time, time_steps,features,target = args['begin_time'], args['end_time'], args['col_time'], args['time_steps'], args['features'],args['target']
-    train_data = data[(data[col_time] >= begin_time)&(data[col_time] < end_time)]
+    col_time, time_steps,features,target = args['col_time'], int(args['time_steps']), str_to_list(args['features']),args['target']
+    train_data = data.fillna(method='ffill').fillna(method='bfill').sort_values(by=col_time)
     # X_train, X_test, y_train, y_test = process_data(df_clean, params)
     # 创建特征和目标的标准化器
     feature_scaler = MinMaxScaler(feature_range=(0, 1))
     target_scaler = MinMaxScaler(feature_range=(0, 1))
     # 标准化特征和目标
-    scaled_features = feature_scaler.fit_transform(data[features])
-    scaled_target = target_scaler.fit_transform(data[[target]])
+    scaled_features = feature_scaler.fit_transform(train_data[features])
+    scaled_target = target_scaler.fit_transform(train_data[[target]])
     # 保存两个scaler
     feature_scaler_bytes = BytesIO()
     joblib.dump(feature_scaler, feature_scaler_bytes)

+ 2 - 1
requirements.txt

@@ -14,4 +14,5 @@ lightgbm==4.5.0
 joblib==1.3.2
 tensorflow==2.2.0
 matplotlib==3.5.3
-Keras==2.3.1
+Keras==2.3.1
+protobuf==3.20.3