ankush-003's picture
Update app.py
61fd43f verified
raw
history blame
4.07 kB
from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_huggingface import HuggingFaceEmbeddings
# from dotenv import load_dotenv
import os
import pymongo
import logging
import nest_asyncio
from langchain.docstore.document import Document
import redis
import threading
import asyncio
import gradio as gr
# config
# nest_asyncio.apply()
logging.basicConfig(level = logging.INFO)
database = "AlertSimAndRemediation"
collection = "alert_embed"
stream_name = "alerts"
# Global variables to store alert information
latest_alert = "No alerts yet."
alert_count = 0
# embedding model
embedding_args = {
"model_name" : "BAAI/bge-large-en-v1.5",
"model_kwargs" : {"device": "cpu"},
"encode_kwargs" : {"normalize_embeddings": True}
}
embedding_model = HuggingFaceEmbeddings(**embedding_args)
# Mongo Connection
connection = pymongo.MongoClient(os.environ["MONGO_URI"])
alert_collection = connection[database][collection]
# Redis connection
r = redis.Redis(host=os.environ['REDIS_HOST'], password=os.environ['REDIS_PWD'], port=16652)
# Preprocessing
async def create_textual_description(entry_data):
entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()}
category = entry_dict["Category"]
created_at = entry_dict["CreatedAt"]
acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged"
remedy = entry_dict["Remedy"]
severity = entry_dict["Severity"]
source = entry_dict["Source"]
node = entry_dict["node"]
description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}."
return description, entry_dict
# Saving alert doc
async def save(entry):
vector_search = MongoDBAtlasVectorSearch.from_documents(
documents=[Document(
page_content=entry["content"],
metadata=entry["metadata"]
)],
embedding=embedding_model,
collection=alert_collection,
index_name="alert_index",
)
logging.info("Alerts stored successfully!")
# Listening to alert stream
async def listen_to_alerts(r):
global latest_alert, alert_count
try:
last_id = '$'
while True:
entries = r.xread({stream_name: last_id}, block=0, count=None)
if entries:
stream, new_entries = entries[0]
for entry_id, entry_data in new_entries:
description, entry_dict = await create_textual_description(entry_data)
await save({
"content" : description,
"metadata" : entry_dict
})
print(description)
latest_alert = description
alert_count += 1
# Update the last ID read
last_id = entry_id
await asyncio.sleep(1)
except KeyboardInterrupt:
print("Exiting...")
def run_alert_listener():
asyncio.run(listen_to_alerts(r))
# Start the alert listener thread
alert_thread = threading.Thread(target=run_alert_listener)
alert_thread.start()
# gradio interface
# Gradio interface
def get_latest_alert():
global latest_alert, alert_count
return latest_alert, f"Total Alerts: {alert_count}"
with gr.Blocks() as app:
gr.Markdown("# Alert Dashboard πŸ””")
with gr.Row():
latest_alert_box = gr.Textbox(label="Latest Alert", lines=3, interactive=False)
alert_count_box = gr.Textbox(label="Alert Count", interactive=False)
refresh_button = gr.Button("Refresh")
refresh_button.click(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box])
app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box])
# Auto-refresh every 5 seconds
app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box], every=5)
# Launch the app
# if __name__ == "__main__":
app.launch()