model_prediction_lightgbm.py 3.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. import pandas as pd
  2. from pymongo import MongoClient
  3. import pickle
  4. from flask import Flask,request
  5. import time
  6. import logging
  7. import traceback
  8. app = Flask('model_prediction_lightgbm——service')
  9. def get_data_from_mongo(args):
  10. mongodb_connection,mongodb_database,mongodb_read_table,timeBegin,timeEnd = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table'],args['timeBegin'],args['timeEnd']
  11. client = MongoClient(mongodb_connection)
  12. # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
  13. db = client[mongodb_database]
  14. collection = db[mongodb_read_table] # 集合名称
  15. query = {"dateTime": {"$gte": timeBegin, "$lte": timeEnd}}
  16. cursor = collection.find(query)
  17. data = list(cursor)
  18. df = pd.DataFrame(data)
  19. # 4. 删除 _id 字段(可选)
  20. if '_id' in df.columns:
  21. df = df.drop(columns=['_id'])
  22. client.close()
  23. return df
  24. def insert_data_into_mongo(res_df,args):
  25. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  26. client = MongoClient(mongodb_connection)
  27. db = client[mongodb_database]
  28. if mongodb_write_table in db.list_collection_names():
  29. db[mongodb_write_table].drop()
  30. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  31. collection = db[mongodb_write_table] # 集合名称
  32. # 将 DataFrame 转为字典格式
  33. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  34. # 插入到 MongoDB
  35. collection.insert_many(data_dict)
  36. print("data inserted successfully!")
  37. def model_prediction(df,args):
  38. mongodb_connection,mongodb_database,mongodb_model_table,model_name = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_model_table'],args['model_name']
  39. client = MongoClient(mongodb_connection)
  40. db = client[mongodb_database]
  41. collection = db[mongodb_model_table]
  42. model_data = collection.find_one({"model_name": model_name})
  43. if model_data is not None:
  44. model_binary = model_data['model'] # 确保这个字段是存储模型的二进制数据
  45. # 反序列化模型
  46. model = pickle.loads(model_binary)
  47. df['predict'] = model.predict(df[model.feature_name()])
  48. print("model predict result successfully!")
  49. return df
  50. @app.route('/model_prediction_lightgbm', methods=['POST'])
  51. def model_prediction_lightgbm():
  52. # 获取程序开始时间
  53. start_time = time.time()
  54. result = {}
  55. success = 0
  56. print("Program starts execution!")
  57. try:
  58. args = request.values.to_dict()
  59. print('args',args)
  60. logger.info(args)
  61. power_df = get_data_from_mongo(args)
  62. model = model_prediction(power_df,args)
  63. insert_data_into_mongo(model,args)
  64. success = 1
  65. except Exception as e:
  66. my_exception = traceback.format_exc()
  67. my_exception.replace("\n","\t")
  68. result['msg'] = my_exception
  69. end_time = time.time()
  70. result['success'] = success
  71. result['args'] = args
  72. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  73. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  74. print("Program execution ends!")
  75. return result
  76. if __name__=="__main__":
  77. print("Program starts execution!")
  78. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  79. logger = logging.getLogger("model_prediction_lightgbm log")
  80. from waitress import serve
  81. serve(app, host="0.0.0.0", port=10090)
  82. print("server start!")