Pārlūkot izejas kodu

修改曦谋获取数据源逻辑

anweiguo 2 dienas atpakaļ
vecāks
revīzija
0b1628ead5
1 mainītis faili ar 24 papildinājumiem un 27 dzēšanām
  1. 24 27
      common/database_dml.py

+ 24 - 27
common/database_dml.py

@@ -15,6 +15,8 @@ import toml
 from typing import Dict, Any, Optional, Union, Tuple
 from datetime import datetime, timedelta
 
+from common.alert import send_message
+
 # 读取 toml 配置文件
 current_dir = os.path.dirname(os.path.abspath(__file__))
 with open(os.path.join(current_dir, 'database.toml'), 'r', encoding='utf-8') as f:
@@ -783,30 +785,26 @@ def get_xmo_data_from_hive(args):
         conn = jaydebeapi.connect(driver_class, jdbc_url, [user, password], jar_file)
         # 查询 Hive 表
         cursor = conn.cursor()
-        query_sql = ""
+        all_rows = []
+
         for i in range(0, days):
             sysdate_pre = (current_date + timedelta(days=i)).strftime("%Y%m%d")
-            if i == 0:
-                pass
-            else:
-                query_sql += "union \n"
-
-            query_sql += """select rowkey,datatimestamp,{2} from hbase_forecast.forecast_xmo_d{3}
-                                                     where rowkey>='{0}-{1}0000' and rowkey<='{0}-{1}2345' \n""".format(
+            query_sql = """select rowkey,datatimestamp,{2} from hbase_forecast.forecast_xmo_d{3}
+                                                     where rowkey>='{0}-{1}0000' and rowkey<='{0}-{1}2345'""".format(
                 farm_id, sysdate_pre, json_feature, i)
-        print("query_sql\n", query_sql)
-        cursor.execute(query_sql)
-        # 获取列名
-        columns = [desc[0] for desc in cursor.description]
-        # 获取所有数据
-        rows = cursor.fetchall()
-        # 转成 DataFrame
-        df = pd.DataFrame(rows, columns=columns)
+            print("query_sql", query_sql)
+            cursor.execute(query_sql)
+            # 获取列名
+            columns = [desc[0] for desc in cursor.description]
+            all_rows.extend(cursor.fetchall())
         cursor.close()
         conn.close()
+        # 转成 DataFrame
+        df = pd.DataFrame(all_rows, columns=columns)
         df[json_feature] = df[json_feature].apply(lambda x: json.loads(x) if isinstance(x, str) else x)
         df_features = pd.json_normalize(df[json_feature])
         if 'forecastDatatime' not in df_features.columns:
+            send_message('获取曦谋数据组件', farm_id, f"{moment}时刻数据缺失!")
             return "The returned data does not contain the forecastDatetime column — the data might be empty or null!"
         else:
             df_features['date_time'] = pd.to_datetime(df_features['forecastDatatime'], unit='ms', utc=True).dt.tz_convert(
@@ -816,14 +814,13 @@ def get_xmo_data_from_hive(args):
 
 
 
-if __name__ == "__main__":
-    print("Program starts execution!")
-    args = {
-        # 'moment': '06',
-        'current_date': '2025060901',
-        'farm_id': 'J00883',
-        'days': '13'
-    }
-    get_xmo_data_from_hive(args)
-
-    print("server start!")
+# if __name__ == "__main__":
+#     print("Program starts execution!")
+#     args = {
+#         'moment': '06',
+#         'current_date': '2025060901',
+#         'farm_id': 'J008809',
+#         'days': '13'
+#     }
+#     print(get_xmo_data_from_hive(args).head(10))
+#     print("server start!")