123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- # @FileName :z_power.py
- # @Time :2024/12/30 14:16
- # @Author :David
- # @Company: shenyang JY
- import os, pickle
- c_names = ['G01', 'G02', 'G03', 'G04', 'G05', 'G06', 'G07', 'G08', 'G09', 'G10'] + ['G' + str(x) for x in range(11, 76)]
- turbineloc = [x for x in range(76, 151, 1)]
- id_names = {id: c_names[x] for x, id in enumerate(turbineloc)}
- path = '../../cluster/260/turbine_cls.pickle'
- with open(path, 'rb') as f:
- turbine_cls = pickle.load(f)
- def turbine_cls_adjust(turbine_cls):
- turbine_cls[2] = [x for x in turbine_cls[2] if x != 'G54']
- turbine_cls[1].append('G54')
- turbine_cls[2].append('G14')
- del turbine_cls[3]
- with open(path, 'wb') as file:
- pickle.dump(turbine_cls, file)
- def zone_powers(input_dir):
- z_power = {}
- for zone, turbines in turbine_cls.items():
- dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in turbineloc if id_names[z] in turbines]
- z_power['C_TIME'] = dfs[0]['C_TIME']
- sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1)
- z_power['z'+str(zone)] = sum_power/1000
- z_power = pd.DataFrame(z_power)
- z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2)
- z_power.to_csv("../../cluster/260/z-power.csv", index=False)
- return z_power
- from pymongo import MongoClient, UpdateOne
- def insert_data_into_mongo(res_df, args):
- """
- 插入数据到 MongoDB 集合中,可以选择覆盖、追加或按指定的 key 进行更新插入。
- 参数:
- - res_df: 要插入的 DataFrame 数据
- - args: 包含 MongoDB 数据库和集合名称的字典
- - overwrite: 布尔值,True 表示覆盖,False 表示追加
- - update_keys: 列表,指定用于匹配的 key 列,如果存在则更新,否则插入 'col1','col2'
- """
- mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
- mongodb_database = args['mongodb_database']
- mongodb_write_table = args['mongodb_write_table']
- overwrite = 1
- update_keys = None
- if 'overwrite' in args.keys():
- overwrite = int(args['overwrite'])
- if 'update_keys' in args.keys():
- update_keys = args['update_keys'].split(',')
- client = MongoClient(mongodb_connection)
- db = client[mongodb_database]
- collection = db[mongodb_write_table]
- # 覆盖模式:删除现有集合
- if overwrite:
- if mongodb_write_table in db.list_collection_names():
- collection.drop()
- print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!")
- # 将 DataFrame 转为字典格式
- data_dict = res_df.to_dict("records") # 每一行作为一个字典
- # 如果没有数据,直接返回
- if not data_dict:
- print("No data to insert.")
- return
- # 如果指定了 update_keys,则执行 upsert(更新或插入)
- if update_keys and not overwrite:
- operations = []
- for record in data_dict:
- # 构建查询条件,用于匹配要更新的文档
- query = {key: record[key] for key in update_keys}
- operations.append(UpdateOne(query, {'$set': record}, upsert=True))
- # 批量执行更新/插入操作
- if operations:
- result = collection.bulk_write(operations)
- print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}")
- else:
- # 追加模式:直接插入新数据
- collection.insert_many(data_dict)
- print("Data inserted successfully!")
- if __name__ == "__main__":
- import pandas as pd
- # turbine_cls_adjust(turbine_cls)
- z_power = zone_powers("../../cluster/260/")
- params = {
- 'mongodb_write_table': 'j00260_zone',
- 'mongodb_database': 'ldw_nwp',
- }
- insert_data_into_mongo(z_power, params)
|