|
|
|
from typing import Dict, Any |
|
from flow_modules.aiflows.ChatFlowModule import ChatAtomicFlow |
|
from aiflows.utils.general_helpers import encode_image,encode_from_buffer |
|
import cv2 |
|
|
|
|
|
class VisionAtomicFlow(ChatAtomicFlow): |
|
""" This class implements the atomic flow for the VisionFlowModule. It is a flow that, given a textual input, and a set of images and/or videos, generates a textual output. |
|
It uses the litellm library as a backend. See https://docs.litellm.ai/docs/providers for supported models and APIs. |
|
|
|
*Configuration Parameters*: |
|
|
|
- `name` (str): The name of the flow. Default: "VisionAtomicFlow" |
|
- `description` (str): A description of the flow. This description is used to generate the help message of the flow. |
|
Default: "A flow that, given a textual input, and a set of images and/or videos, generates a textual output." |
|
- enable_cache (bool): If True, the flow will use the cache. Default: True |
|
- `n_api_retries` (int): The number of times to retry the API call in case of failure. Default: 6 |
|
- `wait_time_between_api_retries` (int): The time to wait between API retries in seconds. Default: 20 |
|
- `system_name` (str): The name of the system. Default: "system" |
|
- `user_name` (str): The name of the user. Default: "user" |
|
- `assistant_name` (str): The name of the assistant. Default: "assistant" |
|
- `backend` (Dict[str, Any]): The configuration of the backend which is used to fetch api keys. Default: LiteLLMBackend with the |
|
default parameters of ChatAtomicFlow (see Flow card of ChatAtomicFlowModule). Except for the following parameters |
|
whose default value is overwritten: |
|
- `api_infos` (List[Dict[str, Any]]): The list of api infos. Default: No default value, this parameter is required. |
|
- `model_name` (Union[Dict[str,str],str]): The name of the model to use. |
|
When using multiple API providers, the model_name can be a dictionary of the form |
|
{"provider_name": "model_name"}. |
|
Default: "gpt-4-vision-preview" (the name needs to follow the name of the model in litellm https://docs.litellm.ai/docs/providers). |
|
- `n` (int) : The number of answers to generate. Default: 1 |
|
- `max_tokens` (int): The maximum number of tokens to generate. Default: 2000 |
|
- `temperature` (float): The temperature to use. Default: 0.3 |
|
- `top_p` (float): An alternative to sampling with temperature. It instructs the model to consider the results of |
|
the tokens with top_p probability. Default: 0.2 |
|
- `frequency_penalty` (float): The higher this value, the more likely the model will repeat itself. Default: 0.0 |
|
- `presence_penalty` (float): The higher this value, the less likely the model will talk about a new topic. Default: 0.0 |
|
- `system_message_prompt_template` (Dict[str,Any]): The template of the system message. It is used to generate the system message. |
|
By default its of type aiflows.prompt_template.JinjaPrompt. |
|
None of the parameters of the prompt are defined by default and therefore need to be defined if one wants to use the system prompt. |
|
Default parameters are defined in aiflows.prompt_template.jinja2_prompts.JinjaPrompt. |
|
- `init_human_message_prompt_template` (Dict[str,Any]): The prompt template of the human/user message used to initialize the conversation |
|
(first time in). It is used to generate the human message. It's passed as the user message to the LLM. |
|
By default its of type aiflows.prompt_template.JinjaPrompt. None of the parameters of the prompt are defined by default and therefore need to be defined if one |
|
wants to use the init_human_message_prompt_template. Default parameters are defined in aiflows.prompt_template.jinja2_prompts.JinjaPrompt. |
|
- `previous_messages` (Dict[str,Any]): Defines which previous messages to include in the input of the LLM. Note that if `first_k`and `last_k` are both none, |
|
all the messages of the flows's history are added to the input of the LLM. Default: |
|
- `first_k` (int): If defined, adds the first_k earliest messages of the flow's chat history to the input of the LLM. Default: None |
|
- `last_k` (int): If defined, adds the last_k latest messages of the flow's chat history to the input of the LLM. Default: None |
|
- Other parameters are inherited from the default configuration of ChatAtomicFlow (see Flow card of ChatAtomicFlowModule). |
|
|
|
*Input Interface Initialized (Expected input the first time in flow)*: |
|
|
|
- `query` (str): The textual query to run the model on. |
|
- `data` (Dict[str, Any]): The data (images or video) to run the model on. It can contain the following keys: |
|
- `images` (List[Dict[str, Any]]): A list of images to run the model on. Each image is a dictionary that contains the following keys: |
|
- `type` (str): The type of the image. It can be "local_path" or "url". |
|
- `image` (str): The image. If type is "local_path", it is a local path to the image. If type is "url", it is a url to the image. |
|
- `video` (Dict[str, Any]): A video to run the model on. It is a dictionary that contains the following keys: |
|
- `video_path` (str): The path to the video. |
|
- `resize` (int): The resize we want to apply on the frames of the video. |
|
- `frame_step_size` (int): The step size between the frames of the video (to send to the model). |
|
- `start_frame` (int): The start frame of the video (to send to the model). |
|
- `end_frame` (int): The last frame of the video (to send to the model). |
|
|
|
*Input Interface (Expected input the after the first time in flow)*: |
|
|
|
- `query` (str): The textual query to run the model on. |
|
- `data` (Dict[str, Any]): The data (images or video) to run the model on. It can contain the following keys: |
|
- `images` (List[Dict[str, Any]]): A list of images to run the model on. Each image is a dictionary that contains the following keys: |
|
- `type` (str): The type of the image. It can be "local_path" or "url". |
|
- `image` (str): The image. If type is "local_path", it is a local path to the image. If type is "url", it is a url to the image. |
|
- `video` (Dict[str, Any]): A video to run the model on. It is a dictionary that contains the following keys: |
|
- `video_path` (str): The path to the video. |
|
- `resize` (int): The resize we want to apply on the frames of the video. |
|
- `frame_step_size` (int): The step size between the frames of the video (to send to the model). |
|
- `start_frame` (int): The start frame of the video (to send to the model). |
|
- `end_frame` (int): The last frame of the video (to send to the model). |
|
|
|
*Output Interface*: |
|
|
|
- `api_output`s (str): The api output of the flow to the query and data |
|
|
|
""" |
|
@staticmethod |
|
def get_image(image): |
|
""" This method returns an image in the appropriate format for API. |
|
|
|
:param image: The image dictionary. |
|
:type image: Dict[str, Any] |
|
:return: The image url. |
|
:rtype: Dict[str, Any] |
|
""" |
|
extension_dict = { |
|
"jpg": "jpeg", |
|
"jpeg": "jpeg", |
|
"png": "png", |
|
"webp": "webp", |
|
"gif": "gif" |
|
} |
|
supported_image_types = ["local_path","url"] |
|
assert image.get("type",None) in supported_image_types, f"Must define a valid image type for every image \n your type: {image.get('type',None)} \n supported types{supported_image_types} " |
|
|
|
processed_image = None |
|
url = None |
|
if image["type"] == "local_path": |
|
processed_image = encode_image(image.get("image")) |
|
image_extension_type = image.get("image").split(".")[-1] |
|
url = f"data:image/{extension_dict[image_extension_type]};base64, {processed_image}" |
|
|
|
elif image["type"] == "url": |
|
processed_image = image |
|
url = image.get("image") |
|
|
|
return {"type": "image_url", "image_url": {"url": url}} |
|
|
|
@staticmethod |
|
def get_video(video): |
|
""" This method returns the video in the appropriate format for API. |
|
|
|
:param video: The video dictionary. |
|
:type video: Dict[str, Any] |
|
:return: The video url. |
|
:rtype: Dict[str, Any] |
|
""" |
|
video_path = video["video_path"] |
|
resize = video.get("resize",768) |
|
frame_step_size = video.get("frame_step_size",10) |
|
start_frame = video.get("start_frame",0) |
|
end_frame = video.get("end_frame",None) |
|
base64Frames = [] |
|
video = cv2.VideoCapture(video_path) |
|
while video.isOpened(): |
|
success,frame = video.read() |
|
if not success: |
|
break |
|
_,buffer = cv2.imencode(".jpg",frame) |
|
base64Frames.append(encode_from_buffer(buffer)) |
|
video.release() |
|
return map(lambda x: {"image": x, "resize": resize},base64Frames[start_frame:end_frame:frame_step_size]) |
|
|
|
@staticmethod |
|
def get_user_message(prompt_template, input_data: Dict[str, Any]): |
|
""" This method constructs the user message to be passed to the API. |
|
|
|
:param prompt_template: The prompt template to use. |
|
:type prompt_template: PromptTemplate |
|
:param input_data: The input data. |
|
:type input_data: Dict[str, Any] |
|
:return: The constructed user message (images , videos and text). |
|
:rtype: Dict[str, Any] |
|
""" |
|
content = VisionAtomicFlow._get_message(prompt_template=prompt_template,input_data=input_data) |
|
media_data = input_data["data"] |
|
if "video" in media_data: |
|
content = [ content[0], *VisionAtomicFlow.get_video(media_data["video"])] |
|
if "images" in media_data: |
|
images = [VisionAtomicFlow.get_image(image) for image in media_data["images"]] |
|
content.extend(images) |
|
return content |
|
|
|
@staticmethod |
|
def _get_message(prompt_template, input_data: Dict[str, Any]): |
|
""" This method constructs the textual message to be passed to the API. |
|
|
|
:param prompt_template: The prompt template to use. |
|
:type prompt_template: PromptTemplate |
|
:param input_data: The input data. |
|
:type input_data: Dict[str, Any] |
|
:return: The constructed textual message. |
|
:rtype: Dict[str, Any] |
|
""" |
|
template_kwargs = {} |
|
for input_variable in prompt_template.input_variables: |
|
template_kwargs[input_variable] = input_data[input_variable] |
|
msg_content = prompt_template.format(**template_kwargs) |
|
return [{"type": "text", "text": msg_content}] |
|
|
|
def _process_input(self, input_data: Dict[str, Any]): |
|
""" This method processes the input data (prepares the messages to send to the API). |
|
|
|
:param input_data: The input data. |
|
:type input_data: Dict[str, Any] |
|
:return: The processed input data. |
|
:rtype: Dict[str, Any] |
|
""" |
|
if self._is_conversation_initialized(): |
|
|
|
user_message_content = self.get_user_message(self.human_message_prompt_template, input_data) |
|
|
|
else: |
|
|
|
self._initialize_conversation(input_data) |
|
if getattr(self, "init_human_message_prompt_template", None) is not None: |
|
|
|
user_message_content = self.get_user_message(self.init_human_message_prompt_template, input_data) |
|
else: |
|
user_message_content = self.get_user_message(self.human_message_prompt_template, input_data) |
|
|
|
self._state_update_add_chat_message(role=self.flow_config["user_name"], |
|
content=user_message_content) |