|
import streamlit as st |
|
import torch |
|
import bitsandbytes |
|
import accelerate |
|
import scipy |
|
import copy |
|
from PIL import Image |
|
import torch.nn as nn |
|
import pandas as pd |
|
from my_model.object_detection import detect_and_draw_objects |
|
from my_model.captioner.image_captioning import get_caption |
|
from my_model.utilities.gen_utilities import free_gpu_resources |
|
from my_model.state_manager import StateManager |
|
|
|
|
|
class InferenceRunner(StateManager): |
|
def __init__(self): |
|
|
|
super().__init__() |
|
self.initialize_state() |
|
self.sample_images = [ |
|
"Files/sample1.jpg", "Files/sample2.jpg", "Files/sample3.jpg"] |
|
|
|
def answer_question(self, caption, detected_objects_str, question, model): |
|
free_gpu_resources() |
|
answer = model.generate_answer(question, caption, detected_objects_str) |
|
free_gpu_resources() |
|
return answer |
|
|
|
|
|
def image_qa_app(self, kbvqa): |
|
|
|
self.col1.write("Choose from sample images:") |
|
cols = self.col1.columns(len(self.sample_images)) |
|
for idx, sample_image_path in enumerate(self.sample_images): |
|
with cols[idx]: |
|
image = Image.open(sample_image_path) |
|
image_for_display = self.resize_image(sample_image_path, 80, 80) |
|
st.image(image_for_display) |
|
if st.button(f'Select Sample Image {idx + 1}', key=f'sample_{idx}'): |
|
self.process_new_image(sample_image_path, image, kbvqa) |
|
|
|
|
|
uploaded_image = self.col1.file_uploader("Or upload an Image", type=["png", "jpg", "jpeg"]) |
|
if uploaded_image is not None: |
|
self.process_new_image(uploaded_image.name, Image.open(uploaded_image), kbvqa) |
|
|
|
|
|
with self.col2: |
|
for image_key, image_data in self.get_images_data().items(): |
|
with st.container(): |
|
nested_col21, nested_col22 = st.columns([0.65, 0.35]) |
|
image_for_display = self.resize_image(image_data['image'], 600) |
|
nested_col21.image(image_for_display, caption=f'Uploaded Image: {image_key[-11:]}') |
|
if not image_data['analysis_done']: |
|
nested_col22.text("Please click 'Analyze Image'..") |
|
with nested_col22: |
|
if st.button('Analyze Image', key=f'analyze_{image_key}', on_click=self.disable_widgets, disabled=self.is_widget_disabled): |
|
|
|
caption, detected_objects_str, image_with_boxes = self.analyze_image(image_data['image'], kbvqa) |
|
self.update_image_data(image_key, caption, detected_objects_str, True) |
|
st.session_state['loading_in_progress'] = False |
|
|
|
|
|
|
|
qa_history = image_data.get('qa_history', []) |
|
|
|
if image_data['analysis_done']: |
|
st.session_state['loading_in_progress'] = False |
|
question = nested_col22.text_input(f"Ask a question about this image ({image_key[-11:]}):", key=f'question_{image_key}') |
|
if nested_col22.button('Get Answer', key=f'answer_{image_key}', on_click=self.disable_widgets, disabled=self.is_widget_disabled): |
|
|
|
if question not in [q for q, _ in qa_history]: |
|
answer = self.answer_question(image_data['caption'], image_data['detected_objects_str'], question, kbvqa) |
|
st.session_state['loading_in_progress'] = False |
|
self.add_to_qa_history(image_key, question, answer) |
|
else: nested_col22.warning("This questions has already been answered.") |
|
|
|
st.session_state['loading_in_progress'] = False |
|
|
|
|
|
for q, a in qa_history: |
|
nested_col22.text(f"Q: {q}\nA: {a}\n") |
|
|
|
def display_message(self, message, warning=False, write=False, text=False): |
|
pass |
|
|
|
|
|
def run_inference(self): |
|
|
|
self.set_up_widgets() |
|
load_fine_tuned_model = False |
|
fine_tuned_model_already_loaded = False |
|
reload_detection_model = False |
|
force_reload_full_model = False |
|
|
|
st.session_state['settings_changed'] = self.has_state_changed() |
|
if st.session_state['settings_changed']: |
|
self.col1.warning("Model settings have changed, please reload the model, this will take a second .. ") |
|
|
|
|
|
st.session_state.button_label = "Reload Model" if self.is_model_loaded() and self.settings_changed else "Load Model" |
|
|
|
with self.col1: |
|
|
|
if st.session_state.method == "Fine-Tuned Model": |
|
|
|
with st.container(): |
|
nested_col11, nested_col12 = st.columns([0.5, 0.5]) |
|
if nested_col11.button(st.session_state.button_label, on_click=self.disable_widgets, disabled=self.is_widget_disabled): |
|
|
|
if st.session_state.button_label == "Load Model": |
|
if self.is_model_loaded(): |
|
free_gpu_resources() |
|
fine_tuned_model_already_loaded = True |
|
|
|
else: |
|
load_fine_tuned_model = True |
|
else: |
|
reload_detection_model = True |
|
|
|
if nested_col12.button("Force Reload", on_click=self.disable_widgets, disabled=self.is_widget_disabled): |
|
force_reload_full_model = True |
|
|
|
if load_fine_tuned_model: |
|
free_gpu_resources() |
|
self.load_model() |
|
st.session_state['loading_in_progress'] = False |
|
|
|
elif fine_tuned_model_already_loaded: |
|
free_gpu_resources() |
|
self.col1.text("Model already loaded and no settings were changed:)") |
|
st.session_state['loading_in_progress'] = False |
|
|
|
elif reload_detection_model: |
|
free_gpu_resources() |
|
self.reload_detection_model() |
|
st.session_state['loading_in_progress'] = False |
|
|
|
elif force_reload_full_model: |
|
free_gpu_resources() |
|
self.force_reload_model() |
|
st.session_state['loading_in_progress'] = False |
|
|
|
elif st.session_state.method == "In-Context Learning (n-shots)": |
|
self.col1.warning(f'Model using {st.session_state.method} is not deployed yet, will be ready later.') |
|
st.session_state['loading_in_progress'] = False |
|
|
|
|
|
if self.is_model_loaded(): |
|
st.session_state['loading_in_progress'] = False |
|
free_gpu_resources() |
|
self.image_qa_app(self.get_model()) |
|
st.write(st.session_state['loading_in_progress']) |
|
|
|
|