David 1 月之前
父節點
當前提交
f1dcb59768
共有 1 個文件被更改,包括 63 次插入2 次删除
  1. 63 2
      DataBase/turbine-cluster/z_power.py

+ 63 - 2
DataBase/turbine-cluster/z_power.py

@@ -27,14 +27,75 @@ def zone_powers(input_dir):
         dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in turbineloc if id_names[z] in turbines]
         z_power['C_TIME'] = dfs[0]['C_TIME']
         sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1)
-        z_power[zone] = sum_power/1000
+        z_power['z'+str(zone)] = sum_power/1000
     z_power = pd.DataFrame(z_power)
     z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2)
     z_power.to_csv("../../cluster/260/z-power.csv", index=False)
+    return z_power
 
+from pymongo import MongoClient, UpdateOne
+def insert_data_into_mongo(res_df, args):
+    """
+    插入数据到 MongoDB 集合中,可以选择覆盖、追加或按指定的 key 进行更新插入。
 
+    参数:
+    - res_df: 要插入的 DataFrame 数据
+    - args: 包含 MongoDB 数据库和集合名称的字典
+    - overwrite: 布尔值,True 表示覆盖,False 表示追加
+    - update_keys: 列表,指定用于匹配的 key 列,如果存在则更新,否则插入 'col1','col2'
+    """
+    mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
+    mongodb_database = args['mongodb_database']
+    mongodb_write_table = args['mongodb_write_table']
+    overwrite = 1
+    update_keys = None
+    if 'overwrite' in args.keys():
+        overwrite = int(args['overwrite'])
+    if 'update_keys' in args.keys():
+        update_keys = args['update_keys'].split(',')
+
+    client = MongoClient(mongodb_connection)
+    db = client[mongodb_database]
+    collection = db[mongodb_write_table]
+
+    # 覆盖模式:删除现有集合
+    if overwrite:
+        if mongodb_write_table in db.list_collection_names():
+            collection.drop()
+            print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!")
+
+    # 将 DataFrame 转为字典格式
+    data_dict = res_df.to_dict("records")  # 每一行作为一个字典
+
+    # 如果没有数据,直接返回
+    if not data_dict:
+        print("No data to insert.")
+        return
+
+    # 如果指定了 update_keys,则执行 upsert(更新或插入)
+    if update_keys and not overwrite:
+        operations = []
+        for record in data_dict:
+            # 构建查询条件,用于匹配要更新的文档
+            query = {key: record[key] for key in update_keys}
+            operations.append(UpdateOne(query, {'$set': record}, upsert=True))
+
+        # 批量执行更新/插入操作
+        if operations:
+            result = collection.bulk_write(operations)
+            print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}")
+    else:
+        # 追加模式:直接插入新数据
+        collection.insert_many(data_dict)
+        print("Data inserted successfully!")
 
 if __name__ == "__main__":
     import pandas as pd
     # turbine_cls_adjust(turbine_cls)
-    zone_powers("../../cluster/260/")
+    z_power = zone_powers("../../cluster/260/")
+    params = {
+        'mongodb_write_table': 'j00260_zone',
+        'mongodb_database': 'ldw_nwp',
+
+    }
+    insert_data_into_mongo(z_power, params)