Spaces:
Sleeping
Sleeping
import os | |
import gradio as gr | |
from langchain_community.vectorstores import MongoDBAtlasVectorSearch | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
import pymongo | |
import logging | |
import nest_asyncio | |
from langchain.docstore.document import Document | |
import redis | |
import asyncio | |
import threading | |
import time | |
# Config | |
nest_asyncio.apply() | |
logging.basicConfig(level=logging.INFO) | |
database = "AlertSimAndRemediation" | |
collection = "alert_embed" | |
stream_name = "alerts" | |
# Environment variables | |
MONGO_URI = os.getenv('MONGO_URI') | |
REDIS_HOST = os.getenv('REDIS_HOST') | |
REDIS_PWD = os.getenv('REDIS_PWD') | |
# Embedding model | |
embedding_args = { | |
"model_name": "BAAI/bge-large-en-v1.5", | |
"model_kwargs": {"device": "cpu"}, | |
"encode_kwargs": {"normalize_embeddings": True} | |
} | |
embedding_model = HuggingFaceEmbeddings(**embedding_args) | |
# MongoDB connection | |
connection = pymongo.MongoClient(MONGO_URI) | |
alert_collection = connection[database][collection] | |
# Redis connection | |
r = redis.Redis(host=REDIS_HOST, password=REDIS_PWD, port=16652) | |
# Global variables to store alert information | |
latest_alert = "No alerts yet." | |
alert_count = 0 | |
# Preprocessing | |
def create_textual_description(entry_data): | |
entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()} | |
category = entry_dict["Category"] | |
created_at = entry_dict["CreatedAt"] | |
acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged" | |
remedy = entry_dict["Remedy"] | |
severity = entry_dict["Severity"] | |
source = entry_dict["Source"] | |
node = entry_dict["node"] | |
description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}." | |
return description, entry_dict | |
# Saving alert doc | |
def save(entry): | |
vector_search = MongoDBAtlasVectorSearch.from_documents( | |
documents=[Document( | |
page_content=entry["content"], | |
metadata=entry["metadata"] | |
)], | |
embedding=embedding_model, | |
collection=alert_collection, | |
index_name="alert_index", | |
) | |
logging.info("Alert stored successfully!") | |
# Listening to alert stream | |
def listen_to_alerts(): | |
global latest_alert, alert_count | |
last_id = '$' | |
while True: | |
entries = r.xread({stream_name: last_id}, block=1000, count=None) | |
if entries: | |
stream, new_entries = entries[0] | |
for entry_id, entry_data in new_entries: | |
description, entry_dict = create_textual_description(entry_data) | |
save({ | |
"content": description, | |
"metadata": entry_dict | |
}) | |
latest_alert = description | |
alert_count += 1 | |
last_id = entry_id | |
# Start listening to alerts in a separate thread | |
threading.Thread(target=listen_to_alerts, daemon=True).start() | |
# Function to get current stats | |
def get_current_stats(): | |
return latest_alert, f"Total Alerts: {alert_count}" | |
# Gradio interface | |
def create_interface(): | |
with gr.Blocks() as iface: | |
gr.Markdown("# Alert Monitoring Service") | |
with gr.Row(): | |
latest_alert_md = gr.Markdown("Waiting for alerts...") | |
with gr.Row(): | |
alert_count_md = gr.Markdown("Total Alerts: 0") | |
def update_stats(): | |
while True: | |
time.sleep(1) # Update every second | |
yield get_current_stats() | |
iface.load(update_stats, None, [latest_alert_md, alert_count_md], every=1) | |
return iface | |
# Launch the app | |
if __name__ == "__main__": | |
iface = create_interface() | |
iface.queue().launch() |