Spaces:
Sleeping
Sleeping
File size: 3,738 Bytes
6e8bd08 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 |
import os
import gradio as gr
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langchain_community.embeddings import HuggingFaceEmbeddings
import pymongo
import logging
import nest_asyncio
from langchain.docstore.document import Document
import redis
import asyncio
import threading
import time
# Config
nest_asyncio.apply()
logging.basicConfig(level=logging.INFO)
database = "AlertSimAndRemediation"
collection = "alert_embed"
stream_name = "alerts"
# Environment variables
MONGO_URI = os.getenv('MONGO_URI')
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PWD = os.getenv('REDIS_PWD')
# Embedding model
embedding_args = {
"model_name": "BAAI/bge-large-en-v1.5",
"model_kwargs": {"device": "cpu"},
"encode_kwargs": {"normalize_embeddings": True}
}
embedding_model = HuggingFaceEmbeddings(**embedding_args)
# MongoDB connection
connection = pymongo.MongoClient(MONGO_URI)
alert_collection = connection[database][collection]
# Redis connection
r = redis.Redis(host=REDIS_HOST, password=REDIS_PWD, port=16652)
# Global variables to store alert information
latest_alert = "No alerts yet."
alert_count = 0
# Preprocessing
def create_textual_description(entry_data):
entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()}
category = entry_dict["Category"]
created_at = entry_dict["CreatedAt"]
acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged"
remedy = entry_dict["Remedy"]
severity = entry_dict["Severity"]
source = entry_dict["Source"]
node = entry_dict["node"]
description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}."
return description, entry_dict
# Saving alert doc
def save(entry):
vector_search = MongoDBAtlasVectorSearch.from_documents(
documents=[Document(
page_content=entry["content"],
metadata=entry["metadata"]
)],
embedding=embedding_model,
collection=alert_collection,
index_name="alert_index",
)
logging.info("Alert stored successfully!")
# Listening to alert stream
def listen_to_alerts():
global latest_alert, alert_count
last_id = '$'
while True:
entries = r.xread({stream_name: last_id}, block=1000, count=None)
if entries:
stream, new_entries = entries[0]
for entry_id, entry_data in new_entries:
description, entry_dict = create_textual_description(entry_data)
save({
"content": description,
"metadata": entry_dict
})
latest_alert = description
alert_count += 1
last_id = entry_id
# Start listening to alerts in a separate thread
threading.Thread(target=listen_to_alerts, daemon=True).start()
# Function to get current stats
def get_current_stats():
return latest_alert, f"Total Alerts: {alert_count}"
# Gradio interface
def create_interface():
with gr.Blocks() as iface:
gr.Markdown("# Alert Monitoring Service")
with gr.Row():
latest_alert_md = gr.Markdown("Waiting for alerts...")
with gr.Row():
alert_count_md = gr.Markdown("Total Alerts: 0")
def update_stats():
while True:
time.sleep(1) # Update every second
yield get_current_stats()
iface.load(update_stats, None, [latest_alert_md, alert_count_md], every=1)
return iface
# Launch the app
if __name__ == "__main__":
iface = create_interface()
iface.queue().launch() |