Преглед изворни кода

修改曦谋获取数据源逻辑

anweiguo пре 2 дана
родитељ
комит
6e005c94a7
1 измењених фајлова са 16 додато и 11 уклоњено
  1. 16 11
      common/database_dml.py

+ 16 - 11
common/database_dml.py

@@ -785,22 +785,27 @@ def get_xmo_data_from_hive(args):
         conn = jaydebeapi.connect(driver_class, jdbc_url, [user, password], jar_file)
         # 查询 Hive 表
         cursor = conn.cursor()
-        all_rows = []
-
+        query_sql = ""
         for i in range(0, days):
             sysdate_pre = (current_date + timedelta(days=i)).strftime("%Y%m%d")
-            query_sql = """select rowkey,datatimestamp,{2} from hbase_forecast.forecast_xmo_d{3}
-                                                     where rowkey>='{0}-{1}0000' and rowkey<='{0}-{1}2345'""".format(
+            if i == 0:
+                pass
+            else:
+                query_sql += "union all\n"
+
+            query_sql += """select rowkey,datatimestamp,{2} from hbase_forecast.forecast_xmo_d{3}
+                                                            where rowkey>='{0}-{1}0000' and rowkey<='{0}-{1}2345' \n""".format(
                 farm_id, sysdate_pre, json_feature, i)
-            print("query_sql", query_sql)
-            cursor.execute(query_sql)
-            # 获取列名
-            columns = [desc[0] for desc in cursor.description]
-            all_rows.extend(cursor.fetchall())
+        print("query_sql\n", query_sql)
+        cursor.execute(query_sql)
+        # 获取列名
+        columns = [desc[0] for desc in cursor.description]
+        # 获取所有数据
+        rows = cursor.fetchall()
+        # 转成 DataFrame
+        df = pd.DataFrame(rows, columns=columns)
         cursor.close()
         conn.close()
-        # 转成 DataFrame
-        df = pd.DataFrame(all_rows, columns=columns)
         df[json_feature] = df[json_feature].apply(lambda x: json.loads(x) if isinstance(x, str) else x)
         df_features = pd.json_normalize(df[json_feature])
         if 'forecastDatatime' not in df_features.columns: