mysql_to_mongo.py 2.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677
  1. import pandas as pd
  2. from pymongo import MongoClient
  3. from sqlalchemy import create_engine
  4. from flask import Flask,request,jsonify
  5. import time
  6. import logging
  7. import traceback
  8. app = Flask('mysql_to_mongo——service')
  9. @app.route('/hello', methods=['POST'])
  10. def hello():
  11. return jsonify(message='Hello, World!')
  12. def get_data_fromMysql(params):
  13. mysql_conn = params['mysql_conn']
  14. query_sql = params['query_sql']
  15. #数据库读取实测气象
  16. engine = create_engine(f"mysql+pymysql://{mysql_conn}")
  17. # 定义SQL查询
  18. env_df = pd.read_sql_query(query_sql, engine)
  19. return env_df
  20. def insert_data_into_mongo(res_df,args):
  21. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  22. client = MongoClient(mongodb_connection)
  23. db = client[mongodb_database]
  24. if mongodb_write_table in db.list_collection_names():
  25. db[mongodb_write_table].drop()
  26. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  27. collection = db[mongodb_write_table] # 集合名称
  28. # 将 DataFrame 转为字典格式
  29. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  30. # 插入到 MongoDB
  31. collection.insert_many(data_dict)
  32. print("data inserted successfully!")
  33. @app.route('/mysql_to_mongo', methods=['POST'])
  34. def data_join():
  35. # 获取程序开始时间
  36. start_time = time.time()
  37. result = {}
  38. success = 0
  39. print("Program starts execution!")
  40. try:
  41. args = request.values.to_dict()
  42. print('args',args)
  43. logger.info(args)
  44. df_mysql = get_data_fromMysql(args)
  45. insert_data_into_mongo(df_mysql, args)
  46. success = 1
  47. except Exception as e:
  48. my_exception = traceback.format_exc()
  49. my_exception.replace("\n","\t")
  50. result['msg'] = my_exception
  51. end_time = time.time()
  52. result['success'] = success
  53. result['args'] = args
  54. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  55. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  56. print("Program execution ends!")
  57. return result
  58. if __name__=="__main__":
  59. print("Program starts execution!")
  60. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  61. logger = logging.getLogger("mysql_to_mongo")
  62. from waitress import serve
  63. serve(app, host="0.0.0.0", port=10095)
  64. print("server start!")