Spaces:
Sleeping
Sleeping
File size: 7,736 Bytes
ff30164 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
from dotenv import load_dotenv
from langchain_core.messages import (
BaseMessage,
HumanMessage,
ToolMessage,
)
import base64
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import END, StateGraph, START
from typing import Annotated, List
from langchain_community.tools import DuckDuckGoSearchRun
from langchain_core.tools import tool
from langchain_experimental.utilities import PythonREPL
import operator
from typing import Annotated, Sequence, TypedDict
from langchain_groq import ChatGroq
import functools
from langchain_core.messages import AIMessage
from langchain_google_genai import ChatGoogleGenerativeAI
from langgraph.prebuilt import ToolNode
from typing import Literal
import gradio as gr
import io
import PIL
load_dotenv()
llm_coder = ChatGroq(temperature=0, model_name="llama-3.1-8b-instant")
llm_image = ChatGoogleGenerativeAI(
model="gemini-1.5-flash",
temperature=0,
max_tokens=None,
timeout=None,
max_retries=2,
)
search_tool = DuckDuckGoSearchRun()
repl_tool = PythonREPL()
@tool
def python_repl(
code: Annotated[str, "The python code to execute to answer the question."],
):
"""Use this to execute python code. If you want to see the output of a value,
you should print it out with `print(...)`. This is visible to the user."""
try:
result = repl_tool.run(code)
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}"
result_str = f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
return (
result_str + "\n\nIf you have completed all tasks, respond with FINAL ANSWER."
)
def create_agent(llm, tools, system_message: str):
"""Create an agent."""
prompt = ChatPromptTemplate.from_messages(
[
(
"system",
"You are a helpful AI assistant, collaborating with other assistants."
" Use the provided tools to progress towards answering the question."
" If you are unable to fully answer, that's OK, another assistant with different tools "
" will help where you left off. Execute what you can to make progress."
" If you or any of the other assistants have the final answer or deliverable,"
" prefix your response with FINAL ANSWER so the team knows to stop."
" You have access to the following tools: {tool_names}.\n{system_message}",
),
MessagesPlaceholder(variable_name="messages"),
]
)
prompt = prompt.partial(system_message=system_message)
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools]))
return prompt | llm.bind_tools(tools)
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add]
sender: str
def agent_node(state, agent, name):
result = agent.invoke(state)
if isinstance(result, ToolMessage):
pass
else:
result = AIMessage(**result.dict(exclude={"type", "name"}), name=name)
return {
"messages": [result],
"sender": name,
}
problem_agent = create_agent(
llm_image,
[],
system_message="You should understand the problem properly and provide a clear description with the edge cases, don't provide the solution, after completing all tasks."
)
problem_node = functools.partial(agent_node, agent=problem_agent, name="problem_agent")
solution_agent = create_agent(
llm_image,
[],
system_message="after understanding the problem, you should provide a solution to the problem in python that is clear and concise and solves all edge cases, also provide intuition behind the solution."
)
solution_node = functools.partial(agent_node, agent=solution_agent, name="solution_agent")
checker_agent = create_agent(
llm_coder,
[],
system_message="critically analyze the solution provided by the solution agent, check for correctness, efficiency, and edge cases, if the solution is correct, provide a message saying so, if not, provide a message with the error and suggest a fix."
)
def checker_node(state):
text_only_messages = []
for msg in state["messages"]:
if isinstance(msg.content, list):
text_content = [item["text"] for item in msg.content if item["type"] == "text"]
new_msg = msg.copy()
new_msg.content = " ".join(text_content)
text_only_messages.append(new_msg)
else:
text_only_messages.append(msg)
text_only_state = {
"messages": text_only_messages,
"sender": state["sender"]
}
result = checker_agent.invoke(text_only_state)
if isinstance(result, ToolMessage):
pass
else:
result = AIMessage(**result.dict(exclude={"type", "name"}), name="checker_agent")
return {
"messages": [result],
"sender": "checker_agent",
}
tools = [search_tool, python_repl]
tool_node = ToolNode(tools)
def router(state) -> Literal["call_tool", "__end__", "continue"]:
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
return "call_tool"
if "FINAL ANSWER" in last_message.content:
return "__end__"
return "continue"
workflow = StateGraph(AgentState)
workflow.add_node("problem_creator", problem_node)
workflow.add_node("solution_generator", solution_node)
workflow.add_node("checker_agent", checker_node)
workflow.add_node("call_tool", tool_node)
workflow.add_conditional_edges(
"problem_creator",
router,
{"continue": "solution_generator", "call_tool": "call_tool", "__end__": END},
)
workflow.add_conditional_edges(
"solution_generator",
router,
{"continue": "checker_agent", "call_tool": "call_tool", "__end__": END},
)
workflow.add_conditional_edges(
"checker_agent",
router,
{"continue": "problem_creator", "call_tool": "call_tool", "__end__": END},
)
workflow.add_conditional_edges(
"call_tool",
lambda x: x["sender"],
{
"problem_creator": "problem_creator",
"solution_generator": "solution_generator",
"checker_agent": "checker_agent",
},
)
workflow.add_edge(START, "problem_creator")
graph = workflow.compile()
def process_images(images: List[tuple[PIL.Image.Image, str | None]]):
if not images:
return "No images uploaded"
# Convert all images to base64
image_contents = []
for (image, _) in images:
buffered = io.BytesIO()
image.save(buffered, format="PNG")
img_str = base64.b64encode(buffered.getvalue()).decode()
image_contents.append({
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_str}"}
})
# Create the input for the workflow
input_data = {"messages": [HumanMessage(
content = [
{"type": "text", "text": "answer the question about the following images"},
*image_contents
]
)]}
# Run the workflow
output = []
try:
for chunk in graph.stream(input_data, {"recursion_limit": 10}, stream_mode="values"):
message = chunk["messages"][-1]
output.append(f"{message.name}: {message.content}")
except Exception as e:
output.append(f"Error: {repr(e)}")
return "\n\n".join(output)
# Create Gradio interface
iface = gr.Interface(
fn=process_images,
inputs=[gr.Gallery(label="Upload an image", type="pil")],
outputs=[gr.Markdown(label="Output", show_copy_button=True)],
title="Image Question Answering",
description="Upload an image to get it processed and answered."
)
# Launch the interface
iface.launch() |