z_power.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :z_power.py
  4. # @Time :2024/12/30 14:16
  5. # @Author :David
  6. # @Company: shenyang JY
  7. import os, pickle
  8. c_names = ['G01', 'G02', 'G03', 'G04', 'G05', 'G06', 'G07', 'G08', 'G09', 'G10'] + ['G' + str(x) for x in range(11, 76)]
  9. turbineloc = [x for x in range(76, 151, 1)]
  10. id_names = {id: c_names[x] for x, id in enumerate(turbineloc)}
  11. path = '../../cluster/260/turbine_cls.pickle'
  12. with open(path, 'rb') as f:
  13. turbine_cls = pickle.load(f)
  14. def turbine_cls_adjust(turbine_cls):
  15. turbine_cls[2] = [x for x in turbine_cls[2] if x != 'G54']
  16. turbine_cls[1].append('G54')
  17. turbine_cls[2].append('G14')
  18. del turbine_cls[3]
  19. with open(path, 'wb') as file:
  20. pickle.dump(turbine_cls, file)
  21. def zone_powers(input_dir):
  22. z_power = {}
  23. for zone, turbines in turbine_cls.items():
  24. dfs = [pd.read_csv(os.path.join(input_dir, f"turbine-{z}.csv")) for z in turbineloc if id_names[z] in turbines]
  25. z_power['C_TIME'] = dfs[0]['C_TIME']
  26. sum_power = pd.concat([df['C_ACTIVE_POWER'] for df in dfs], ignore_index=True, axis=1).sum(axis=1)
  27. z_power['z'+str(zone)] = sum_power/1000
  28. z_power = pd.DataFrame(z_power)
  29. z_power.iloc[:, 1:] = z_power.iloc[:, 1:].round(2)
  30. z_power.to_csv("../../cluster/260/z-power.csv", index=False)
  31. return z_power
  32. from pymongo import MongoClient, UpdateOne
  33. def insert_data_into_mongo(res_df, args):
  34. """
  35. 插入数据到 MongoDB 集合中,可以选择覆盖、追加或按指定的 key 进行更新插入。
  36. 参数:
  37. - res_df: 要插入的 DataFrame 数据
  38. - args: 包含 MongoDB 数据库和集合名称的字典
  39. - overwrite: 布尔值,True 表示覆盖,False 表示追加
  40. - update_keys: 列表,指定用于匹配的 key 列,如果存在则更新,否则插入 'col1','col2'
  41. """
  42. mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
  43. mongodb_database = args['mongodb_database']
  44. mongodb_write_table = args['mongodb_write_table']
  45. overwrite = 1
  46. update_keys = None
  47. if 'overwrite' in args.keys():
  48. overwrite = int(args['overwrite'])
  49. if 'update_keys' in args.keys():
  50. update_keys = args['update_keys'].split(',')
  51. client = MongoClient(mongodb_connection)
  52. db = client[mongodb_database]
  53. collection = db[mongodb_write_table]
  54. # 覆盖模式:删除现有集合
  55. if overwrite:
  56. if mongodb_write_table in db.list_collection_names():
  57. collection.drop()
  58. print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!")
  59. # 将 DataFrame 转为字典格式
  60. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  61. # 如果没有数据,直接返回
  62. if not data_dict:
  63. print("No data to insert.")
  64. return
  65. # 如果指定了 update_keys,则执行 upsert(更新或插入)
  66. if update_keys and not overwrite:
  67. operations = []
  68. for record in data_dict:
  69. # 构建查询条件,用于匹配要更新的文档
  70. query = {key: record[key] for key in update_keys}
  71. operations.append(UpdateOne(query, {'$set': record}, upsert=True))
  72. # 批量执行更新/插入操作
  73. if operations:
  74. result = collection.bulk_write(operations)
  75. print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}")
  76. else:
  77. # 追加模式:直接插入新数据
  78. collection.insert_many(data_dict)
  79. print("Data inserted successfully!")
  80. if __name__ == "__main__":
  81. import pandas as pd
  82. # turbine_cls_adjust(turbine_cls)
  83. z_power = zone_powers("../../cluster/260/")
  84. params = {
  85. 'mongodb_write_table': 'j00260_zone',
  86. 'mongodb_database': 'ldw_nwp',
  87. }
  88. insert_data_into_mongo(z_power, params)