#!/usr/bin/env python # -*- coding:utf-8 -*- # @FileName :compare.py # @Time :2025/1/20 15:57 # @Author :David # @Company: shenyang JY from pymongo import MongoClient, UpdateOne import pandas as pd import os import numpy as np np.random.seed(42) current_path = os.path.dirname(__file__) current_path = os.path.dirname(__file__) def insert_data_into_mongo(res_df, args): mongodb_connection = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/" mongodb_database = args['mongodb_database'] mongodb_write_table = args['mongodb_write_table'] overwrite = 1 update_keys = None if 'overwrite' in args.keys(): overwrite = int(args['overwrite']) if 'update_keys' in args.keys(): update_keys = args['update_keys'].split(',') client = MongoClient(mongodb_connection) db = client[mongodb_database] collection = db[mongodb_write_table] # 覆盖模式:删除现有集合 if overwrite: if mongodb_write_table in db.list_collection_names(): collection.drop() print(f"Collection '{mongodb_write_table}' already exists, deleted successfully!") # 将 DataFrame 转为字典格式 data_dict = res_df.to_dict("records") # 每一行作为一个字典 # 如果没有数据,直接返回 if not data_dict: print("No data to insert.") return # 如果指定了 update_keys,则执行 upsert(更新或插入) if update_keys and not overwrite: operations = [] for record in data_dict: # 构建查询条件,用于匹配要更新的文档 query = {key: record[key] for key in update_keys} operations.append(UpdateOne(query, {'$set': record}, upsert=True)) # 批量执行更新/插入操作 if operations: result = collection.bulk_write(operations) print(f"Matched: {result.matched_count}, Upserts: {result.upserted_count}") else: # 追加模式:直接插入新数据 collection.insert_many(data_dict) print("Data inserted successfully!") if __name__ == "__main__": pre_data = pd.read_csv('./data/测试集1.csv') pre_data = pre_data.loc[:, ['C_TIME', 'dq_fix', 'C_FP_VALUE', 'history', 'coe-acc', 'coe-ass', 'howLongAgo']] df_melted = pre_data.melt(id_vars=['C_TIME', 'howLongAgo'], var_name='model', value_name='power_forecast') df_melted['farm_id'] = 'J00000' pass