ソースを参照

1.修改曦谋hive to mongo

anweiguo 2 週間 前
コミット
4888026061
2 ファイル変更42 行追加11 行削除
  1. 39 10
      common/database_dml.py
  2. 3 1
      data_processing/data_operation/hive_to_mongo.py

+ 39 - 10
common/database_dml.py

@@ -74,6 +74,20 @@ def get_df_list_from_mongo(args):
     return df_list
 
 
+def delete_data_from_mongo(args):
+    mongodb_connection = config['mongodb']['mongodb_connection']
+    mongodb_database = args['mongodb_database']
+    mongodb_write_table = args['mongodb_write_table']
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    collection = db[mongodb_write_table]
+    if mongodb_write_table in db.list_collection_names():
+        collection.drop()
+        print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!")
+    else:
+        print(f"Collection '{mongodb_write_table}' already not exists!")
+
+
 def insert_data_into_mongo(res_df, args):
     """
     插入数据到 MongoDB 集合中,可以选择覆盖、追加或按指定的 key 进行更新插入。
@@ -731,15 +745,28 @@ def get_xmo_data_from_hive(args):
     password = hive_config['password']
     features = config['xmo']['features']
     numeric_features = config['xmo']['numeric_features']
-    if 'moment' not in args or 'farm_id' not in args:
-        msg_error = 'One or more of the following parameters are missing: moment, farm_id!'
+    if 'farm_id' not in args:
+        msg_error = 'One or more of the following parameters are missing: farm_id!'
         return msg_error
     else:
-        moment = args['moment']
         farm_id = args['farm_id']
-
+        if 'moment' in args:
+            moment = args['moment']
+        else:
+            hour = datetime.strptime(args['current_date'], "%Y%m%d%H").hour
+            if hour < 3:
+                moment = '00'
+            elif hour < 6:
+                moment = '03'
+            elif hour < 15:
+                moment = '09'
+            elif hour < 19:
+                moment = '12'
+            else:
+                moment = '18'
+            print(moment)
         if 'current_date' in args:
-            current_date = datetime.strptime(args['current_date'], "%Y%m%d")
+            current_date = datetime.strptime(args['current_date'], "%Y%m%d%H")
         else:
             current_date = datetime.now()
         if 'days' in args:
@@ -748,6 +775,7 @@ def get_xmo_data_from_hive(args):
             days = 1
         json_feature = f"nwp_xmo_{moment}"
         # 建立连接
+        """"""
         conn = jaydebeapi.connect(driver_class, jdbc_url, [user, password], jar_file)
         # 查询 Hive 表
         cursor = conn.cursor()
@@ -759,7 +787,7 @@ def get_xmo_data_from_hive(args):
             else:
                 query_sql += "union \n"
 
-            query_sql += """select rowkey,datatimestamp,{2} from hbase_forecast.forecast_xmo_d{3} 
+            query_sql += """select rowkey,datatimestamp,{2} from hbase_forecast.forecast_xmo_d{3}
                                                      where rowkey>='{0}-{1}0000' and rowkey<='{0}-{1}2345' \n""".format(
                 farm_id, sysdate_pre, json_feature, i)
         print("query_sql\n", query_sql)
@@ -783,14 +811,15 @@ def get_xmo_data_from_hive(args):
             return df_features[features]
 
 
+
 if __name__ == "__main__":
     print("Program starts execution!")
     args = {
-        'moment': '06',
-        'current_date': '20250609',
+        # 'moment': '06',
+        'current_date': '2025060901',
         'farm_id': 'J00883',
         'days': '13'
     }
-    df = get_xmo_data_from_hive(args)
-    print(df.head(2),df.shape)
+    get_xmo_data_from_hive(args)
+
     print("server start!")

+ 3 - 1
data_processing/data_operation/hive_to_mongo.py

@@ -2,7 +2,8 @@ from flask import Flask,request,jsonify
 import time
 import logging
 import traceback
-from common.database_dml import insert_data_into_mongo,get_xmo_data_from_hive
+from common.database_dml import insert_data_into_mongo, get_xmo_data_from_hive, delete_data_from_mongo
+
 app = Flask('hive_to_mongo——service')
 
 
@@ -22,6 +23,7 @@ def data_join():
         args = request.values.to_dict()
         print('args', args)
         logger.info(args)
+        delete_data_from_mongo(args)
         df_hive = get_xmo_data_from_hive(args)
         if isinstance(df_hive, str):
             success = 0