#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :z_power.py # @Time :2024/12/30 14:16 # @Author :David # @Company: shenyang JY import os, pickle c_names = ['G01', 'G02', 'G03', 'G04', 'G05', 'G06', 'G07', 'G08', 'G09', 'G10'] + ['G' + str(x) for x in range(11, 76)] turbineloc = [x for x in range(76, 151, 1)] id_names = {id: c_names[x] for x, id in enumerate(turbineloc)} path = '../../cluster/260/turbine_cls.pickle' with open(path, 'rb') as f: turbine_cls = pickle.load(f) def turbine_cls_adjust(turbine_cls): turbine_cls[2] = [x for x in turbine_cls[2] if x != 'G54'] turbine_cls[1].append('G54') turbine_cls[2].append('G14') del turbine_cls[3] with open(path, 'wb') as file: pickle.dump(turbine_cls, file) def zone_powers(input_dir): z_power = {} for zone, turbines in turbine_cls.items(): dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in turbineloc if id_names[z] in turbines] z_power['C_TIME'] = dfs[0]['C_TIME'] sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1) z_power['z'+str(zone)] = sum_power/1000 z_power = pd.DataFrame(z_power) z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2) z_power.to_csv("../../cluster/260/z-power.csv", index=False) return z_power from pymongo import MongoClient, UpdateOne def insert_data_into_mongo(res_df, args): """ 插入数据到 MongoDB 集合中,可以选择覆盖、追加或按指定的 key 进行更新插入。 参数: - res_df: 要插入的 DataFrame 数据 - args: 包含 MongoDB 数据库和集合名称的字典 - overwrite: 布尔值,True 表示覆盖,False 表示追加 - update_keys: 列表,指定用于匹配的 key 列,如果存在则更新,否则插入 'col1','col2' """ mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/" mongodb_database = args['mongodb_database'] mongodb_write_table = args['mongodb_write_table'] overwrite = 1 update_keys = None if 'overwrite' in args.keys(): overwrite = int(args['overwrite']) if 'update_keys' in args.keys(): update_keys = args['update_keys'].split(',') client = MongoClient(mongodb_connection) db = client[mongodb_database] collection = db[mongodb_write_table] # 覆盖模式:删除现有集合 if overwrite: if mongodb_write_table in db.list_collection_names(): collection.drop() print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!") # 将 DataFrame 转为字典格式 data_dict = res_df.to_dict("records") # 每一行作为一个字典 # 如果没有数据,直接返回 if not data_dict: print("No data to insert.") return # 如果指定了 update_keys,则执行 upsert(更新或插入) if update_keys and not overwrite: operations = [] for record in data_dict: # 构建查询条件,用于匹配要更新的文档 query = {key: record[key] for key in update_keys} operations.append(UpdateOne(query, {'$set': record}, upsert=True)) # 批量执行更新/插入操作 if operations: result = collection.bulk_write(operations) print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}") else: # 追加模式:直接插入新数据 collection.insert_many(data_dict) print("Data inserted successfully!") if __name__ == "__main__": import pandas as pd # turbine_cls_adjust(turbine_cls) z_power = zone_powers("../../cluster/260/") params = { 'mongodb_write_table': 'j00260_zone', 'mongodb_database': 'ldw_nwp', } insert_data_into_mongo(z_power, params)