File size: 5,842 Bytes
125214f
 
 
 
 
 
 
 
 
 
 
 
 
1a4d79e
125214f
1a4d79e
125214f
 
c3dcca1
125214f
c3dcca1
125214f
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1a4d79e
125214f
 
 
 
 
 
 
 
1a4d79e
125214f
 
 
 
1a4d79e
125214f
 
1a4d79e
125214f
 
 
 
1a4d79e
 
125214f
 
 
 
 
 
 
 
 
1a4d79e
125214f
 
 
 
 
 
1a4d79e
 
125214f
 
 
 
 
 
 
 
 
 
 
 
 
 
1a4d79e
 
125214f
1a4d79e
125214f
1a4d79e
 
125214f
1a4d79e
125214f
 
 
 
1a4d79e
 
125214f
1a4d79e
 
125214f
 
1a4d79e
 
 
 
125214f
 
 
 
 
1a4d79e
125214f
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import streamlit as st
import torch
import bitsandbytes
import accelerate
import scipy
import copy
from PIL import Image
import torch.nn as nn
import pandas as pd
from my_model.object_detection import detect_and_draw_objects
from my_model.captioner.image_captioning import get_caption
from my_model.gen_utilities import free_gpu_resources
from my_model.KBVQA import KBVQA, prepare_kbvqa_model
from my_model.utilities.st_utils import UIManager, StateManager

state_manager = StateManager()

def answer_question(caption, detected_objects_str, question, model):
    free_gpu_resources()
    answer = model.generate_answer(question, caption, detected_objects_str)
    free_gpu_resources()
    return answer


# Sample images (assuming these are paths to your sample images)
sample_images = ["Files/sample1.jpg", "Files/sample2.jpg", "Files/sample3.jpg", 
                 "Files/sample4.jpg", "Files/sample5.jpg", "Files/sample6.jpg", 
                 "Files/sample7.jpg"]



def analyze_image(image, model):
    
    img = copy.deepcopy(image)  # we dont wanna apply changes to the original image
    caption = model.get_caption(img)
    image_with_boxes, detected_objects_str = model.detect_objects(img)
    st.text("I am ready, let's talk!")
    free_gpu_resources()
    
    return caption, detected_objects_str, image_with_boxes
    

def image_qa_app(state_manager, kbvqa):
    # Display sample images as clickable thumbnails
    st.write("Choose from sample images:")
    cols = st.columns(len(sample_images))
    for idx, sample_image_path in enumerate(sample_images):
        with cols[idx]:
            image = Image.open(sample_image_path)
            st.image(image, use_column_width=True)
            if st.button(f'Select Sample Image {idx + 1}', key=f'sample_{idx}'):
                state_manager.process_new_image(sample_image_path, image, kbvqa)

    # Image uploader
    uploaded_image = st.file_uploader("Or upload an Image", type=["png", "jpg", "jpeg"])
    if uploaded_image is not None:
        state_manager.process_new_image(uploaded_image.name, Image.open(uploaded_image), kbvqa)

    # Display and interact with each uploaded/selected image
    for image_key, image_data in state_manager.get_images_data().items():
        st.image(image_data['image'], caption=f'Uploaded Image: {image_key[-11:]}', use_column_width=True)
        if not image_data['analysis_done']:
            st.text("Cool image, please click 'Analyze Image'..")
            if st.button('Analyze Image', key=f'analyze_{image_key}'):
                caption, detected_objects_str, image_with_boxes = state_manager.analyze_image(image_data['image'], kbvqa)
                state_manager.update_image_data(image_key, caption, detected_objects_str, True)

        # Initialize qa_history for each image
        qa_history = image_data.get('qa_history', [])

        if image_data['analysis_done']:
            question = st.text_input(f"Ask a question about this image ({image_key[-11:]}):", key=f'question_{image_key}')
            if st.button('Get Answer', key=f'answer_{image_key}'):
                if question not in [q for q, _ in qa_history]:
                    answer = answer_question(image_data['caption'], image_data['detected_objects_str'], question, kbvqa)
                    state_manager.add_to_qa_history(image_key, question, answer)

        # Display Q&A history for each image
        for q, a in qa_history:
            st.text(f"Q: {q}\nA: {a}\n")




def process_new_image(image_key, image, kbvqa):
    """Process a new image and update the session state."""
    if image_key not in st.session_state['images_data']:
        st.session_state['images_data'][image_key] = {
            'image': image,
            'caption': '',
            'detected_objects_str': '',
            'qa_history': [],
            'analysis_done': False
        }

def run_inference():
    st.title("Run Inference")

    method = st.selectbox("Choose a method:", ["Fine-Tuned Model", "In-Context Learning (n-shots)"], index=0)
    detection_model = st.selectbox("Choose a model for objects detection:", ["yolov5", "detic"], index=1)
    default_confidence = 0.2 if detection_model == "yolov5" else 0.4
    confidence_level = st.slider("Select minimum detection confidence level", min_value=0.1, max_value=0.9, value=default_confidence, step=0.1)

    state_manager.update_model_settings(detection_model, confidence_level, method)
    settings_changed = state_manager.check_settings_changed(method, detection_model, confidence_level)

    need_model_reload = settings_changed and state_manager.is_model_loaded()
    button_label = "Reload Model" if need_model_reload else "Load Model"

    if method == "Fine-Tuned Model":
        if st.button(button_label):
            if state_manager.is_model_loaded() and not settings_changed:
                st.write("Model already loaded.")
            else:
                st.text("Loading the model, please wait...")
                state_manager.load_model(detection_model, confidence_level)
                st.write("Model is ready for inference.")

        if state_manager.is_model_loaded():
            state_manager.display_model_settings()
            state_manager.display_session_state()
            image_qa_app(state_manager.get_model())

    else:
        st.write('Model is not ready yet, will be updated later.')



def display_model_settings():
    st.write("### Current Model Settings:")
    st.table(pd.DataFrame(st.session_state['model_settings'], index=[0]))

def display_session_state():
    st.write("### Current Session State:")
    # Convert session state to a list of dictionaries, each representing a row
    data = [{'Key': key, 'Value': str(value)} for key, value in st.session_state.items()]
    # Create a DataFrame from the list
    df = pd.DataFrame(data)
    st.table(df)