import pandas as pd from pymongo import MongoClient from sqlalchemy import create_engine from flask import Flask,request,jsonify import time import logging import traceback from functools import reduce app = Flask('mysql_to_mongo——service') @app.route('/hello', methods=['POST']) def hello(): return jsonify(message='Hello, World!') def get_data_fromMysql(params): mysql_conn = params['mysql_conn'] query_sql = params['query_sql'] #数据库读取实测气象 engine = create_engine(f"mysql+pymysql://{mysql_conn}") # 定义SQL查询 env_df = pd.read_sql_query(query_sql, engine) return env_df def insert_data_into_mongo(res_df,args): mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table'] client = MongoClient(mongodb_connection) db = client[mongodb_database] if mongodb_write_table in db.list_collection_names(): db[mongodb_write_table].drop() print(f"Collection '{mongodb_write_table} already exist, deleted successfully!") collection = db[mongodb_write_table] # 集合名称 # 将 DataFrame 转为字典格式 data_dict = res_df.to_dict("records") # 每一行作为一个字典 # 插入到 MongoDB collection.insert_many(data_dict) print("data inserted successfully!") @app.route('/mysql_to_mongo', methods=['POST']) def data_join(): # 获取程序开始时间 start_time = time.time() result = {} success = 0 print("Program starts execution!") try: args = request.values.to_dict() print('args',args) logger.info(args) df_mysql = get_data_fromMysql(args) insert_data_into_mongo(df_mysql, args) success = 1 except Exception as e: my_exception = traceback.format_exc() my_exception.replace("\n","\t") result['msg'] = my_exception end_time = time.time() result['success'] = success result['args'] = args result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time)) result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time)) print("Program execution ends!") return result if __name__=="__main__": print("Program starts execution!") logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s') logger = logging.getLogger("mysql_to_mongo") from waitress import serve serve(app, host="0.0.0.0", port=10095) print("server start!")