processing_limit_power_by_agcavc.py 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import argparse
  2. import pandas as pd
  3. from pymongo import MongoClient
  4. from sqlalchemy import create_engine
  5. from flask import Flask,request,jsonify
  6. from waitress import serve
  7. import time
  8. import logging
  9. import traceback
  10. app = Flask('processing_limit_power_by_agcavc——service')
  11. @app.route('/hello', methods=['POST','GET'])
  12. def hello():
  13. return jsonify(message='Hello, World!')
  14. def get_data_from_mongo(args):
  15. mongodb_connection,mongodb_database,mongodb_read_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_read_table']
  16. client = MongoClient(mongodb_connection)
  17. # 选择数据库(如果数据库不存在,MongoDB 会自动创建)
  18. db = client[mongodb_database]
  19. collection = db[mongodb_read_table] # 集合名称
  20. data_from_db = collection.find() # 这会返回一个游标(cursor)
  21. # 将游标转换为列表,并创建 pandas DataFrame
  22. df = pd.DataFrame(list(data_from_db))
  23. client.close()
  24. return df
  25. def insert_data_into_mongo(res_df,args):
  26. mongodb_connection,mongodb_database,mongodb_write_table = "mongodb://root:sdhjfREWFWEF23e@192.168.1.43:30000/",args['mongodb_database'],args['mongodb_write_table']
  27. client = MongoClient(mongodb_connection)
  28. db = client[mongodb_database]
  29. if mongodb_write_table in db.list_collection_names():
  30. db[mongodb_write_table].drop()
  31. print(f"Collection '{mongodb_write_table} already exist, deleted successfully!")
  32. collection = db[mongodb_write_table] # 集合名称
  33. # 将 DataFrame 转为字典格式
  34. data_dict = res_df.to_dict("records") # 每一行作为一个字典
  35. # 插入到 MongoDB
  36. collection.insert_many(data_dict)
  37. print("data inserted successfully!")
  38. #1.AGC/AVC信号判断限电(有的场站准 有的不准) 1种方法 数据库数据有问题 暂时用不了
  39. def agc_avc_judgement(power_df,args):
  40. timeBegin,timeEnd,col_time,mysql_connection,avc_table = args['timeBegin'], args['timeEnd'],args['col_time'],args['mysql_connection'],args['agc_avc_table']
  41. #限电记录
  42. clean_record = []
  43. # 创建连接
  44. # cnx = mysql.connector.connect(user=user,password=password,host=host,port=port,database=database)
  45. # # 创建一个游标对象
  46. # cursor = cnx.cursor()
  47. engine = create_engine(mysql_connection)
  48. # 定义SQL查询
  49. df = pd.read_sql_query(f"select C_TIME AS {col_time}, 1 as agc_avc_limit from {avc_table} where C_TIME>='{timeBegin} 00:00:00' and C_TIME<='{timeEnd} 23:59:59' and (C_IS_RATIONING_BY_AUTO_CONTROL is True or C_IS_RATIONING_BY_MANUAL_CONTROL=1)", engine)
  50. df[col_time] = pd.to_datetime(df[col_time]).dt.strftime('%Y-%m-%d %H:%M:%S')
  51. if df.shape[0]>0:
  52. print(f"根据限电记录清洗,{timeBegin}至{timeEnd},限电时长共计{round(df.shape[0]/60,2)}小时")
  53. clean_record = df[col_time].tolist()
  54. # 关闭游标和连接
  55. # cursor.close()
  56. # cnx.close()
  57. print(power_df.columns)
  58. power_df = pd.merge(power_df,df,on=col_time,how='left')
  59. return power_df[power_df['agc_avc_limit']!=1].drop('agc_avc_limit',axis=1)
  60. @app.route('/processing_limit_power_by_agcavc', methods=['POST','GET'])
  61. def processing_limit_power_by_agcavc():
  62. # 获取程序开始时间
  63. start_time = time.time()
  64. result = {}
  65. success = 0
  66. print("Program starts execution!")
  67. try:
  68. args = request.values.to_dict()
  69. print('args',args)
  70. logger.info(args)
  71. power_df = get_data_from_mongo(args)
  72. res_df = agc_avc_judgement(power_df,args)
  73. insert_data_into_mongo(res_df,args)
  74. success = 1
  75. except Exception as e:
  76. my_exception = traceback.format_exc()
  77. my_exception.replace("\n","\t")
  78. result['msg'] = my_exception
  79. end_time = time.time()
  80. result['success'] = success
  81. result['args'] = args
  82. result['start_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(start_time))
  83. result['end_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(end_time))
  84. print("Program execution ends!")
  85. return result
  86. if __name__=="__main__":
  87. print("Program starts execution!")
  88. logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  89. logger = logging.getLogger("processing_limit_power_by_agcavc")
  90. from waitress import serve
  91. serve(app, host="0.0.0.0", port=10086)
  92. print("server start!")