ankush-003 commited on
Commit
6f367b5
β€’
1 Parent(s): 5812f2d

Added redis streams listener

Browse files
Files changed (1) hide show
  1. app.py +119 -7
app.py CHANGED
@@ -32,6 +32,122 @@ index_name = "alert_index"
32
  stream_name = "alerts"
33
  redis_port = 16652
34
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
35
  # Streamlit Application
36
  st.set_page_config(
37
  page_title="ASMR Query Bot πŸ””",
@@ -45,13 +161,9 @@ st.set_page_config(
45
 
46
  st.title('ASMR Query Bot πŸ””')
47
 
48
- # embedding model
49
- embedding_args = {
50
- "model_name" : "BAAI/bge-large-en-v1.5",
51
- "model_kwargs" : {"device": "cpu"},
52
- "encode_kwargs" : {"normalize_embeddings": True}
53
- }
54
- embedding_model = HuggingFaceEmbeddings(**embedding_args)
55
 
56
  # vector search
57
  vector_search = MongoDBAtlasVectorSearch.from_connection_string(
 
32
  stream_name = "alerts"
33
  redis_port = 16652
34
 
35
+ # embedding model
36
+ embedding_args = {
37
+ "model_name" : "BAAI/bge-large-en-v1.5",
38
+ "model_kwargs" : {"device": "cpu"},
39
+ "encode_kwargs" : {"normalize_embeddings": True}
40
+ }
41
+ embedding_model = HuggingFaceEmbeddings(**embedding_args)
42
+
43
+ # Mongo Connection
44
+ connection = pymongo.MongoClient(os.environ["MONGO_URI"])
45
+ alert_collection = connection[database][collection]
46
+
47
+ # redis connection
48
+ r = redis.Redis(host=os.environ['REDIS_HOST'], password=os.environ['REDIS_PWD'], port=port)
49
+
50
+ Sure, here's the entire code with the Streamlit app and the Redis stream listener combined:
51
+ pythonCopy codeimport streamlit as st
52
+ import os
53
+ from collections.abc import Collection
54
+ from langchain.memory import ChatMessageHistory
55
+ from langchain_community.chat_message_histories import (
56
+ StreamlitChatMessageHistory,
57
+ )
58
+ from langchain_groq import ChatGroq
59
+ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
60
+ from langchain_community.vectorstores import MongoDBAtlasVectorSearch
61
+ from langchain_community.embeddings import HuggingFaceEmbeddings
62
+ from langchain.chains import create_history_aware_retriever, create_retrieval_chain
63
+ from langchain.chains.combine_documents import create_stuff_documents_chain
64
+ from langchain.output_parsers import ResponseSchema, StructuredOutputParser
65
+ from langchain_core.runnables.history import RunnableWithMessageHistory
66
+ from langchain_core.chat_history import BaseChatMessageHistory
67
+ from langchain.chains import RetrievalQA
68
+ import nest_asyncio
69
+ nest_asyncio.apply()
70
+
71
+ import pymongo
72
+ import logging
73
+ import nest_asyncio
74
+ from langchain.docstore.document import Document
75
+ import redis
76
+ import threading
77
+
78
+ # Config
79
+ nest_asyncio.apply()
80
+ logging.basicConfig(level=logging.INFO)
81
+ database = "AlertSimAndRemediation"
82
+ collection = "alert_embed"
83
+ stream_name = "alerts"
84
+
85
+ # Embedding model
86
+ embedding_args = {
87
+ "model_name": "BAAI/bge-large-en-v1.5",
88
+ "model_kwargs": {"device": "cpu"},
89
+ "encode_kwargs": {"normalize_embeddings": True}
90
+ }
91
+ embedding_model = HuggingFaceEmbeddings(**embedding_args)
92
+
93
+ # Mongo Connection
94
+ connection = pymongo.MongoClient(os.environ["MONGO_URI"])
95
+ alert_collection = connection[database][collection]
96
+
97
+ # Redis connection
98
+ r = redis.Redis(host=os.environ['REDIS_HOST'], password=os.environ['REDIS_PWD'], port=16652)
99
+
100
+ # Preprocessing
101
+ async def create_textual_description(entry_data):
102
+ entry_dict = {k.decode(): v.decode() for k, v in entry_data.items()}
103
+ category = entry_dict["Category"]
104
+ created_at = entry_dict["CreatedAt"]
105
+ acknowledged = "Acknowledged" if entry_dict["Acknowledged"] == "1" else "Not Acknowledged"
106
+ remedy = entry_dict["Remedy"]
107
+ severity = entry_dict["Severity"]
108
+ source = entry_dict["Source"]
109
+ node = entry_dict["node"]
110
+ description = f"A {severity} alert of category {category} was raised from the {source} source for node {node} at {created_at}. The alert is {acknowledged}. The recommended remedy is: {remedy}."
111
+ return description, entry_dict
112
+
113
+ # Saving alert doc
114
+ async def save(entry):
115
+ vector_search = MongoDBAtlasVectorSearch.from_documents(
116
+ documents=[Document(
117
+ page_content=entry["content"],
118
+ metadata=entry["metadata"]
119
+ )],
120
+ embedding=embedding_model,
121
+ collection=alert_collection,
122
+ index_name="alert_index",
123
+ )
124
+ logging.info("Alerts stored successfully!")
125
+
126
+ # Listening to alert stream
127
+ async def listen_to_alerts(r):
128
+ logging.info("Listening to alerts...")
129
+ try:
130
+ last_id = '$'
131
+ while True:
132
+ entries = r.xread({stream_name: last_id}, block=0, count=None)
133
+ if entries:
134
+ stream, new_entries = entries[0]
135
+ for entry_id, entry_data in new_entries:
136
+ description, entry_dict = await create_textual_description(entry_data)
137
+ await save({"content": description, "metadata": entry_dict})
138
+ st.toast(description, icon='πŸ””')
139
+ # Update the last ID read
140
+ last_id = entry_id
141
+ except KeyboardInterrupt:
142
+ print("Exiting...")
143
+
144
+ # Start Redis listener in a separate thread
145
+ def start_redis_listener():
146
+ try:
147
+ nest_asyncio.run(listen_to_alerts(r))
148
+ except Exception as e:
149
+ print(f"Error in Redis listener: {e}")
150
+
151
  # Streamlit Application
152
  st.set_page_config(
153
  page_title="ASMR Query Bot πŸ””",
 
161
 
162
  st.title('ASMR Query Bot πŸ””')
163
 
164
+ # Start Redis listener in a separate thread
165
+ redis_listener_thread = threading.Thread(target=start_redis_listener)
166
+ redis_listener_thread.start()
 
 
 
 
167
 
168
  # vector search
169
  vector_search = MongoDBAtlasVectorSearch.from_connection_string(