|
import gradio as gr |
|
import openai |
|
import time |
|
import re |
|
import os |
|
from PIL import Image |
|
from transformers import LlavaProcessor, LlavaForConditionalGeneration, TextIteratorStreamer |
|
from threading import Thread |
|
import torch |
|
|
|
|
|
MODELS = [ |
|
"Meta-Llama-3.1-405B-Instruct", |
|
"Meta-Llama-3.1-70B-Instruct", |
|
"Meta-Llama-3.1-8B-Instruct" |
|
] |
|
|
|
|
|
API_BASE = "https://api.sambanova.ai/v1" |
|
|
|
|
|
model_id = "llava-hf/llava-interleave-qwen-0.5b-hf" |
|
processor = LlavaProcessor.from_pretrained(model_id) |
|
model = LlavaForConditionalGeneration.from_pretrained(model_id) |
|
model.to("cpu") |
|
|
|
def create_client(api_key=None): |
|
"""Creates an OpenAI client instance.""" |
|
if api_key: |
|
openai.api_key = api_key |
|
else: |
|
openai.api_key = os.getenv("API_KEY") |
|
|
|
return openai.OpenAI(api_key=openai.api_key, base_url=API_BASE) |
|
|
|
def chat_with_ai(message, chat_history, system_prompt): |
|
"""Formats the chat history for the API call.""" |
|
messages = [{"role": "system", "content": system_prompt}] |
|
for tup in chat_history: |
|
first_key = list(tup.keys())[0] |
|
last_key = list(tup.keys())[-1] |
|
messages.append({"role": "user", "content": tup[first_key]}) |
|
messages.append({"role": "assistant", "content": tup[last_key]}) |
|
messages.append({"role": "user", "content": message}) |
|
return messages |
|
|
|
def llava_image_processing(image, prompt): |
|
"""Processes the image using the Llava model.""" |
|
gr.Info("Analyzing image") |
|
image = Image.open(image).convert("RGB") |
|
formatted_prompt = f"<|im_start|>user <image>\n{prompt}<|im_end|><|im_start|>assistant" |
|
|
|
inputs = processor(formatted_prompt, image, return_tensors="pt") |
|
streamer = TextIteratorStreamer(processor, skip_prompt=True, **{"skip_special_tokens": True}) |
|
generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=1024) |
|
|
|
thread = Thread(target=model.generate, kwargs=generation_kwargs) |
|
thread.start() |
|
|
|
buffer = "" |
|
for new_text in streamer: |
|
buffer += new_text |
|
yield buffer |
|
|
|
def respond(message, chat_history, model, system_prompt, thinking_budget, api_key): |
|
"""Sends the message to the API and gets the response.""" |
|
client = create_client(api_key) |
|
messages = chat_with_ai(message, chat_history, system_prompt.format(budget=thinking_budget)) |
|
start_time = time.time() |
|
|
|
try: |
|
completion = client.chat.completions.create(model=model, messages=messages) |
|
response = completion.choices[0].message.content |
|
thinking_time = time.time() - start_time |
|
return response, thinking_time |
|
except Exception as e: |
|
error_message = f"Error: {str(e)}" |
|
return error_message, time.time() - start_time |
|
|
|
def parse_response(response): |
|
"""Parses the response from the API.""" |
|
answer_match = re.search(r'<answer>(.*?)</answer>', response, re.DOTALL) |
|
reflection_match = re.search(r'<reflection>(.*?)</reflection>', response, re.DOTALL) |
|
|
|
answer = answer_match.group(1).strip() if answer_match else "" |
|
reflection = reflection_match.group(1).strip() if reflection_match else "" |
|
steps = re.findall(r'<step>(.*?)</step>', response, re.DOTALL) |
|
|
|
if answer == "": |
|
return response, "", "" |
|
|
|
return answer, reflection, steps |
|
|
|
def generate(message, history, model, thinking_budget, api_key=None): |
|
"""Generates the chatbot response.""" |
|
|
|
system_prompt = DEFAULT_SYSTEM_PROMPT |
|
|
|
response, thinking_time = respond(message, history, model, system_prompt, thinking_budget, api_key) |
|
|
|
if response.startswith("Error:"): |
|
return history + [({"role": "system", "content": response},)], "" |
|
|
|
answer, reflection, steps = parse_response(response) |
|
|
|
messages = [] |
|
messages.append({"role": "user", "content": message}) |
|
|
|
formatted_steps = [f"Step {i}: {step}" for i, step in enumerate(steps, 1)] |
|
all_steps = "\n".join(formatted_steps) + f"\n\nReflection: {reflection}" |
|
|
|
messages.append({"role": "assistant", "content": all_steps, "metadata": {"title": f"Thinking Time: {thinking_time:.2f} sec"}}) |
|
messages.append({"role": "assistant", "content": answer}) |
|
|
|
return history + messages, "" |
|
|
|
|
|
DEFAULT_SYSTEM_PROMPT = """ |
|
You are D-LOGIC, an advanced AI assistant created by Rafa艂 Dembski, a passionate self-learner in programming and artificial intelligence. Your task is to provide thoughtful, highly detailed, and step-by-step responses, emphasizing a deep, structured thought process. **Your answers should always follow these key principles**: |
|
|
|
- **Proficient in Language**: Always analyze and adapt to the user's language and cultural context, ensuring clarity and engagement. |
|
- **Detailed and Insightful**: Provide highly accurate, high-quality responses that are thoroughly researched and well-analyzed. |
|
- **Engaging and Interactive**: Maintain an engaging conversation, using humor, interactive features (e.g., quizzes, polls), and emotional intelligence. |
|
- **Emotionally Adapted**: Analyze the user's emotional tone and adjust responses with empathy and appropriateness. |
|
- **Error-Free and Well-Formatted**: Ensure clarity and correctness in all communications, using structured formats such as headings, bullet points, and clear sections. |
|
""" |
|
|
|
|
|
|
|
with gr.Blocks() as demo: |
|
|
|
gr.Markdown("# D-LOGIC: Tw贸j Inteligentny Asystent AI") |
|
gr.Markdown(""" |
|
**D-LOGIC** to zaawansowany asystent AI stworzony przez Rafa艂a Dembskiego. Pomaga w rozwi膮zywaniu problem贸w, analizie dokument贸w i oferuje spersonalizowane odpowiedzi, dostosowane do Twoich emocji i potrzeb. Mo偶esz tak偶e przes艂a膰 obraz do analizy! |
|
""") |
|
|
|
with gr.Row(): |
|
model = gr.Dropdown(choices=MODELS, label="Wybierz Model", value=MODELS[0]) |
|
thinking_budget = gr.Slider(minimum=1, maximum=100, value=25, step=1, label="Bud偶et My艣lenia", info="Maksymalna liczba krok贸w, kt贸re model mo偶e przemy艣le膰") |
|
|
|
chatbot = gr.Chatbot(label="Chat", show_label=False, show_share_button=False, show_copy_button=True, likeable=True, layout="panel", type="messages") |
|
|
|
with gr.Row(): |
|
msg = gr.Textbox(label="Wpisz swoj膮 wiadomo艣膰...", placeholder="Wprowad藕 swoj膮 wiadomo艣膰...") |
|
image_input = gr.File(label="Prze艣lij obraz do analizy (opcjonalnie)") |
|
|
|
submit_button = gr.Button("Wy艣lij") |
|
clear_button = gr.Button("Wyczy艣膰 Chat") |
|
|
|
clear_button.click(lambda: ([], ""), inputs=None, outputs=[chatbot, msg]) |
|
|
|
def handle_message_or_image(message, image, chatbot, model, thinking_budget): |
|
if image: |
|
return llava_image_processing(image, message), "" |
|
else: |
|
return generate(message, chatbot, model, thinking_budget) |
|
|
|
|
|
submit_button.click(fn=handle_message_or_image, inputs=[msg, image_input, chatbot, model, thinking_budget], outputs=[chatbot, msg]) |
|
|
|
demo.launch(share=True, show_api=False) |
|
|