import os import gradio as gr from langchain_community.vectorstores import MongoDBAtlasVectorSearch from langchain_community.embeddings import HuggingFaceEmbeddings import pymongo import logging import nest_asyncio from langchain.docstore.document import Document import redis import asyncio import threading import time # Config nest_asyncio.apply() logging.basicConfig(level=logging.INFO) database = "AlertSimAndRemediation" collection = "alert_embed" stream_name = "alerts" # Environment variables MONGO_URI = os.getenv('MONGO_URI') REDIS_HOST = os.getenv('REDIS_HOST') REDIS_PWD = os.getenv('REDIS_PWD') # Embedding model embedding_args = { "model_name": "BAAI/bge-large-en-v1.5", "model_kwargs": {"device": "cpu"}, "encode_kwargs": {"normalize_embeddings": True} } embedding_model = HuggingFaceEmbeddings(**embedding_args) # MongoDB connection connection = pymongo.MongoClient(MONGO_URI) alert_collection = connection[database][collection] # Redis connection r = redis.Redis(host=REDIS_HOST, password=REDIS_PWD, port=16652) # Global variables to store alert information latest_alert = "No alerts yet." alert_count = 0 # Preprocessing def create_textual_description(entry_data): entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()} category = entry_dict["Category"] created_at = entry_dict["CreatedAt"] acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged" remedy = entry_dict["Remedy"] severity = entry_dict["Severity"] source = entry_dict["Source"] node = entry_dict["node"] description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}." return description, entry_dict # Saving alert doc def save(entry): vector_search = MongoDBAtlasVectorSearch.from_documents( documents=[Document( page_content=entry["content"], metadata=entry["metadata"] )], embedding=embedding_model, collection=alert_collection, index_name="alert_index", ) logging.info("Alert stored successfully!") # Listening to alert stream def listen_to_alerts(): global latest_alert, alert_count last_id = '$' while True: entries = r.xread({stream_name: last_id}, block=1000, count=None) if entries: stream, new_entries = entries[0] for entry_id, entry_data in new_entries: description, entry_dict = create_textual_description(entry_data) save({ "content": description, "metadata": entry_dict }) latest_alert = description alert_count += 1 last_id = entry_id # Start listening to alerts in a separate thread threading.Thread(target=listen_to_alerts, daemon=True).start() # Function to get current stats def get_current_stats(): return latest_alert, f"Total Alerts: {alert_count}" # Gradio interface def create_interface(): with gr.Blocks() as iface: gr.Markdown("# Alert Monitoring Service") with gr.Row(): latest_alert_md = gr.Markdown("Waiting for alerts...") with gr.Row(): alert_count_md = gr.Markdown("Total Alerts: 0") def update_stats(): while True: time.sleep(1) # Update every second yield get_current_stats() iface.load(update_stats, None, [latest_alert_md, alert_count_md], every=1) return iface # Launch the app if __name__ == "__main__": iface = create_interface() iface.queue().launch()