ankush-003's picture
Create app.py
6e8bd08 verified
raw
history blame
3.74 kB
import os
import gradio as gr
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langchain_community.embeddings import HuggingFaceEmbeddings
import pymongo
import logging
import nest_asyncio
from langchain.docstore.document import Document
import redis
import asyncio
import threading
import time
# Config
nest_asyncio.apply()
logging.basicConfig(level=logging.INFO)
database = "AlertSimAndRemediation"
collection = "alert_embed"
stream_name = "alerts"
# Environment variables
MONGO_URI = os.getenv('MONGO_URI')
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PWD = os.getenv('REDIS_PWD')
# Embedding model
embedding_args = {
"model_name": "BAAI/bge-large-en-v1.5",
"model_kwargs": {"device": "cpu"},
"encode_kwargs": {"normalize_embeddings": True}
}
embedding_model = HuggingFaceEmbeddings(**embedding_args)
# MongoDB connection
connection = pymongo.MongoClient(MONGO_URI)
alert_collection = connection[database][collection]
# Redis connection
r = redis.Redis(host=REDIS_HOST, password=REDIS_PWD, port=16652)
# Global variables to store alert information
latest_alert = "No alerts yet."
alert_count = 0
# Preprocessing
def create_textual_description(entry_data):
entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()}
category = entry_dict["Category"]
created_at = entry_dict["CreatedAt"]
acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged"
remedy = entry_dict["Remedy"]
severity = entry_dict["Severity"]
source = entry_dict["Source"]
node = entry_dict["node"]
description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}."
return description, entry_dict
# Saving alert doc
def save(entry):
vector_search = MongoDBAtlasVectorSearch.from_documents(
documents=[Document(
page_content=entry["content"],
metadata=entry["metadata"]
)],
embedding=embedding_model,
collection=alert_collection,
index_name="alert_index",
)
logging.info("Alert stored successfully!")
# Listening to alert stream
def listen_to_alerts():
global latest_alert, alert_count
last_id = '$'
while True:
entries = r.xread({stream_name: last_id}, block=1000, count=None)
if entries:
stream, new_entries = entries[0]
for entry_id, entry_data in new_entries:
description, entry_dict = create_textual_description(entry_data)
save({
"content": description,
"metadata": entry_dict
})
latest_alert = description
alert_count += 1
last_id = entry_id
# Start listening to alerts in a separate thread
threading.Thread(target=listen_to_alerts, daemon=True).start()
# Function to get current stats
def get_current_stats():
return latest_alert, f"Total Alerts: {alert_count}"
# Gradio interface
def create_interface():
with gr.Blocks() as iface:
gr.Markdown("# Alert Monitoring Service")
with gr.Row():
latest_alert_md = gr.Markdown("Waiting for alerts...")
with gr.Row():
alert_count_md = gr.Markdown("Total Alerts: 0")
def update_stats():
while True:
time.sleep(1) # Update every second
yield get_current_stats()
iface.load(update_stats, None, [latest_alert_md, alert_count_md], every=1)
return iface
# Launch the app
if __name__ == "__main__":
iface = create_interface()
iface.queue().launch()