mongo.py 2.4 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. #!/usr/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # @FileName :compare.py
  4. # @Time :2025/1/20 15:57
  5. # @Author :David
  6. # @Company: shenyang JY
  7. from pymongo import MongoClient, UpdateOne
  8. import pandas as pd
  9. import os
  10. import numpy as np
  11. np.random.seed(42)
  12. current_path = os.path.dirname(__file__)
  13. current_path = os.path.dirname(__file__)
  14. def insert_data_into_mongo(res_df, args):
  15. mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/"
  16. mongodb_database = args['mongodb_database']
  17. mongodb_write_table = args['mongodb_write_table']
  18. overwrite = 1
  19. update_keys = None
  20. if 'overwrite' in args.keys():
  21. overwrite = int(args['overwrite'])
  22. if 'update_keys' in args.keys():
  23. update_keys = args['update_keys'].split(',')
  24. client = MongoClient(mongodb_connection)
  25. db = client[mongodb_database]
  26. collection = db[mongodb_write_table]
  27. # 覆盖模式:删除现有集合
  28. if overwrite:
  29. if mongodb_write_table in db.list_collection_names():
  30. collection.drop()
  31. print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!")
  32. # 将 DataFrame 转为字典格式
  33. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  34. # 如果没有数据,直接返回
  35. if not data_dict:
  36. print("No data to insert.")
  37. return
  38. # 如果指定了 update_keys,则执行 upsert(更新或插入)
  39. if update_keys and not overwrite:
  40. operations = []
  41. for record in data_dict:
  42. # 构建查询条件,用于匹配要更新的文档
  43. query = {key: record[key] for key in update_keys}
  44. operations.append(UpdateOne(query, {'$set': record}, upsert=True))
  45. # 批量执行更新/插入操作
  46. if operations:
  47. result = collection.bulk_write(operations)
  48. print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}")
  49. else:
  50. # 追加模式:直接插入新数据
  51. collection.insert_many(data_dict)
  52. print("Data inserted successfully!")
  53. if __name__ == "__main__":
  54. pre_data = pd.read_csv('./data/测试集1.csv')
  55. pre_data = pre_data.loc[:, ['C_TIME', 'dq_fix', 'C_FP_VALUE', 'history', 'coe-acc', 'coe-ass', 'howLongAgo']]
  56. df_melted = pre_data.melt(id_vars=['C_TIME', 'howLongAgo'], var_name='model', value_name='power_forecast')
  57. df_melted['farm_id'] = 'J00000'
  58. pass