File size: 4,069 Bytes
61fd43f
 
 
6e8bd08
 
 
 
 
 
 
61fd43f
 
6e8bd08
61fd43f
 
 
6e8bd08
 
 
 
61fd43f
 
 
6e8bd08
61fd43f
6e8bd08
61fd43f
 
 
6e8bd08
 
 
61fd43f
 
6e8bd08
 
 
61fd43f
6e8bd08
 
61fd43f
6e8bd08
 
 
 
 
 
 
 
 
 
 
 
 
 
 
61fd43f
6e8bd08
 
 
 
 
 
 
 
 
61fd43f
6e8bd08
 
61fd43f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
6e8bd08
 
 
61fd43f
 
 
 
 
 
 
 
6e8bd08
61fd43f
 
 
 
 
 
6e8bd08
 
61fd43f
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from langchain_mongodb import MongoDBAtlasVectorSearch
from langchain_huggingface import HuggingFaceEmbeddings
# from dotenv import load_dotenv
import os
import pymongo
import logging
import nest_asyncio
from langchain.docstore.document import Document
import redis
import threading
import asyncio
import gradio as gr

# config
# nest_asyncio.apply()
logging.basicConfig(level = logging.INFO)
database = "AlertSimAndRemediation"
collection = "alert_embed"
stream_name = "alerts"

# Global variables to store alert information
latest_alert = "No alerts yet."
alert_count = 0

# embedding model
embedding_args = {
    "model_name" : "BAAI/bge-large-en-v1.5",
    "model_kwargs" : {"device": "cpu"},
    "encode_kwargs" : {"normalize_embeddings": True}
}
embedding_model = HuggingFaceEmbeddings(**embedding_args)

# Mongo Connection
connection = pymongo.MongoClient(os.environ["MONGO_URI"])
alert_collection = connection[database][collection]

# Redis connection
r = redis.Redis(host=os.environ['REDIS_HOST'], password=os.environ['REDIS_PWD'], port=16652)

# Preprocessing
async def create_textual_description(entry_data):
    entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()}

    category = entry_dict["Category"]
    created_at = entry_dict["CreatedAt"]
    acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged"
    remedy = entry_dict["Remedy"]
    severity = entry_dict["Severity"]
    source = entry_dict["Source"]
    node = entry_dict["node"]

    description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}."

    return description, entry_dict

# Saving alert doc
async def save(entry):
    vector_search = MongoDBAtlasVectorSearch.from_documents(
        documents=[Document(
            page_content=entry["content"],
            metadata=entry["metadata"]
        )],
        embedding=embedding_model,
        collection=alert_collection,
        index_name="alert_index",
    )
    logging.info("Alerts stored successfully!")

# Listening to alert stream
async def listen_to_alerts(r):
    global latest_alert, alert_count
    try:
        last_id = '$'

        while True:
            entries = r.xread({stream_name: last_id}, block=0, count=None)

            if entries:
                stream, new_entries = entries[0]

                for entry_id, entry_data in new_entries:
                    description, entry_dict = await create_textual_description(entry_data)
                    await save({
                        "content" : description,
                        "metadata" : entry_dict
                    })
                    print(description)
                    latest_alert = description
                    alert_count += 1
                    # Update the last ID read
                    last_id = entry_id
                    await asyncio.sleep(1) 

    except KeyboardInterrupt:
        print("Exiting...")

def run_alert_listener():
    asyncio.run(listen_to_alerts(r))

# Start the alert listener thread
alert_thread = threading.Thread(target=run_alert_listener)
alert_thread.start()

# gradio interface
# Gradio interface
def get_latest_alert():
    global latest_alert, alert_count
    return latest_alert, f"Total Alerts: {alert_count}"

with gr.Blocks() as app:
    gr.Markdown("# Alert Dashboard 🔔")
    
    with gr.Row():
        latest_alert_box = gr.Textbox(label="Latest Alert", lines=3, interactive=False)
        alert_count_box = gr.Textbox(label="Alert Count", interactive=False)
    
    refresh_button = gr.Button("Refresh")
    
    refresh_button.click(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box])
    
    app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box])

    # Auto-refresh every 5 seconds
    app.load(get_latest_alert, inputs=None, outputs=[latest_alert_box, alert_count_box], every=5)

# Launch the app
# if __name__ == "__main__":
app.launch()