from typing import Dict, Any from flows.base_flows import CircularFlow from flows.utils import logging from .ControllerAtomicFlow import ControllerAtomicFlow logging.set_verbosity_debug() log = logging.get_logger(__name__) class ControllerExecutorFlow(CircularFlow): def _on_reach_max_round(self): self._state_update_dict({ "answer": "The maximum amount of rounds was reached before the model found an answer.", "status": "unfinished" }) @CircularFlow.output_msg_payload_processor def detect_finish_or_continue(self, output_payload: Dict[str, Any], src_flow: ControllerAtomicFlow) -> Dict[str, Any]: command = output_payload["command"] if command == "finish": return { "EARLY_EXIT": True, "answer": output_payload["command_args"]["answer"], "status": "finished" } else: return output_payload