File size: 3,738 Bytes
6e8bd08
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
import os
import gradio as gr
from langchain_community.vectorstores import MongoDBAtlasVectorSearch
from langchain_community.embeddings import HuggingFaceEmbeddings
import pymongo
import logging
import nest_asyncio
from langchain.docstore.document import Document
import redis
import asyncio
import threading
import time

# Config
nest_asyncio.apply()
logging.basicConfig(level=logging.INFO)
database = "AlertSimAndRemediation"
collection = "alert_embed"
stream_name = "alerts"

# Environment variables
MONGO_URI = os.getenv('MONGO_URI')
REDIS_HOST = os.getenv('REDIS_HOST')
REDIS_PWD = os.getenv('REDIS_PWD')

# Embedding model
embedding_args = {
    "model_name": "BAAI/bge-large-en-v1.5",
    "model_kwargs": {"device": "cpu"},
    "encode_kwargs": {"normalize_embeddings": True}
}
embedding_model = HuggingFaceEmbeddings(**embedding_args)

# MongoDB connection
connection = pymongo.MongoClient(MONGO_URI)
alert_collection = connection[database][collection]

# Redis connection
r = redis.Redis(host=REDIS_HOST, password=REDIS_PWD, port=16652)

# Global variables to store alert information
latest_alert = "No alerts yet."
alert_count = 0

# Preprocessing
def create_textual_description(entry_data):
    entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()}

    category = entry_dict["Category"]
    created_at = entry_dict["CreatedAt"]
    acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged"
    remedy = entry_dict["Remedy"]
    severity = entry_dict["Severity"]
    source = entry_dict["Source"]
    node = entry_dict["node"]

    description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}."

    return description, entry_dict

# Saving alert doc
def save(entry):
    vector_search = MongoDBAtlasVectorSearch.from_documents(
        documents=[Document(
            page_content=entry["content"],
            metadata=entry["metadata"]
        )],
        embedding=embedding_model,
        collection=alert_collection,
        index_name="alert_index",
    )
    logging.info("Alert stored successfully!")

# Listening to alert stream
def listen_to_alerts():
    global latest_alert, alert_count
    last_id = '$'

    while True:
        entries = r.xread({stream_name: last_id}, block=1000, count=None)

        if entries:
            stream, new_entries = entries[0]

            for entry_id, entry_data in new_entries:
                description, entry_dict = create_textual_description(entry_data)
                save({
                    "content": description,
                    "metadata": entry_dict
                })
                latest_alert = description
                alert_count += 1
                last_id = entry_id

# Start listening to alerts in a separate thread
threading.Thread(target=listen_to_alerts, daemon=True).start()

# Function to get current stats
def get_current_stats():
    return latest_alert, f"Total Alerts: {alert_count}"

# Gradio interface
def create_interface():
    with gr.Blocks() as iface:
        gr.Markdown("# Alert Monitoring Service")
        with gr.Row():
            latest_alert_md = gr.Markdown("Waiting for alerts...")
        with gr.Row():
            alert_count_md = gr.Markdown("Total Alerts: 0")
        
        def update_stats():
            while True:
                time.sleep(1)  # Update every second
                yield get_current_stats()
        
        iface.load(update_stats, None, [latest_alert_md, alert_count_md], every=1)
    
    return iface

# Launch the app
if __name__ == "__main__":
    iface = create_interface()
    iface.queue().launch()