mysql_to_mongo.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778
  1. import pandas as pd
  2. from pymongo import MongoClient
  3. from sqlalchemy import create_engine
  4. from flask import Flask,request,jsonify
  5. import time
  6. import logging
  7. import traceback
  8. from functools import reduce
  9. app = Flask('mysql_to_mongo——service')
  10. @app.route('/hello', methods=['POST'])
  11. def hello():
  12. return jsonify(message='Hello, World!')
  13. def get_data_fromMysql(params):
  14. mysql_conn = params['mysql_conn']
  15. query_sql = params['query_sql']
  16. #数据库读取实测气象
  17. engine = create_engine(f"mysql+pymysql://{mysql_conn}")
  18. # 定义SQL查询
  19. env_df = pd.read_sql_query(query_sql, engine)
  20. return env_df
  21. def insert_data_into_mongo(res_df,args):
  22. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  23. client = MongoClient(mongodb_connection)
  24. db = client[mongodb_database]
  25. if mongodb_write_table in db.list_collection_names():
  26. db[mongodb_write_table].drop()
  27. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  28. collection = db[mongodb_write_table] # 集合名称
  29. # 将 DataFrame 转为字典格式
  30. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  31. # 插入到 MongoDB
  32. collection.insert_many(data_dict)
  33. print("data inserted successfully!")
  34. @app.route('/mysql_to_mongo', methods=['POST'])
  35. def data_join():
  36. # 获取程序开始时间
  37. start_time = time.time()
  38. result = {}
  39. success = 0
  40. print("Program starts execution!")
  41. try:
  42. args = request.values.to_dict()
  43. print('args',args)
  44. logger.info(args)
  45. df_mysql = get_data_fromMysql(args)
  46. insert_data_into_mongo(df_mysql, args)
  47. success = 1
  48. except Exception as e:
  49. my_exception = traceback.format_exc()
  50. my_exception.replace("\n","\t")
  51. result['msg'] = my_exception
  52. end_time = time.time()
  53. result['success'] = success
  54. result['args'] = args
  55. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  56. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  57. print("Program execution ends!")
  58. return result
  59. if __name__=="__main__":
  60. print("Program starts execution!")
  61. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  62. logger = logging.getLogger("mysql_to_mongo")
  63. from waitress import serve
  64. serve(app, host="0.0.0.0", port=10095)
  65. print("server start!")