|
"""A simple script to run a Flow that can be used for development and debugging.""" |
|
|
|
import os |
|
|
|
import hydra |
|
|
|
import aiflows |
|
from aiflows.flow_launchers import FlowLauncher |
|
from aiflows.backends.api_info import ApiInfo |
|
from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys |
|
|
|
from aiflows import logging |
|
from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache |
|
|
|
from aiflows.utils import serve_utils |
|
from aiflows.workers import run_dispatch_worker_thread |
|
from aiflows.messages import FlowMessage |
|
from aiflows.interfaces import KeyInterface |
|
from aiflows.utils.colink_utils import start_colink_server |
|
from aiflows.workers import run_dispatch_worker_thread |
|
|
|
CACHING_PARAMETERS.do_caching = False |
|
|
|
|
|
logging.set_verbosity_debug() |
|
|
|
|
|
dependencies = [ |
|
{"url": "aiflows/VectorStoreFlowModule", "revision": os.getcwd()} |
|
] |
|
|
|
from aiflows import flow_verse |
|
flow_verse.sync_dependencies(dependencies) |
|
if __name__ == "__main__": |
|
|
|
|
|
FLOW_MODULES_PATH = "./" |
|
|
|
cl = start_colink_server() |
|
|
|
|
|
|
|
root_dir = "." |
|
cfg_path = os.path.join(root_dir, "demo.yaml") |
|
cfg = read_yaml_file(cfg_path) |
|
|
|
|
|
|
|
api_information = [ApiInfo(backend_used="openai", |
|
api_key = os.getenv("OPENAI_API_KEY"))] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
quick_load_api_keys(cfg, api_information, key="api_infos") |
|
|
|
|
|
|
|
serve_utils.recursive_serve_flow( |
|
cl = cl, |
|
flow_type="ChromaDBFlowModule", |
|
default_config=cfg["chroma_demo_flow"], |
|
default_state=None, |
|
default_dispatch_point="coflows_dispatch" |
|
) |
|
|
|
|
|
run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH) |
|
|
|
|
|
proxy_flow_cdb = serve_utils.recursive_mount( |
|
cl=cl, |
|
client_id="local", |
|
flow_type="ChromaDBFlowModule", |
|
config_overrides=None, |
|
initial_state=None, |
|
dispatch_point_override=None, |
|
) |
|
|
|
|
|
serve_utils.recursive_serve_flow( |
|
cl = cl, |
|
flow_type="VectoreStoreFlowModule", |
|
default_config=cfg["vector_store_demo_flow"], |
|
default_state=None, |
|
default_dispatch_point="coflows_dispatch" |
|
) |
|
|
|
|
|
run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH) |
|
|
|
|
|
proxy_flow_vs = serve_utils.recursive_mount( |
|
cl=cl, |
|
client_id="local", |
|
flow_type="VectoreStoreFlowModule", |
|
config_overrides=None, |
|
initial_state=None, |
|
dispatch_point_override=None, |
|
) |
|
|
|
|
|
data_write = {"id": 0, "operation": "write", "content": "The capital of Switzerland is Bern"} |
|
data_read = {"id": 1, "operation": "read", "content": "Capital of Switzerland"} |
|
|
|
input_message_write = FlowMessage( |
|
data=data_write, |
|
) |
|
|
|
input_message_read = FlowMessage( |
|
data=data_read |
|
) |
|
|
|
|
|
|
|
|
|
|
|
print("##########CHROMA DB DEMO###############") |
|
|
|
proxy_flow_cdb.send_message_async(input_message_write) |
|
|
|
future = proxy_flow_cdb.send_message_blocking(input_message_read) |
|
|
|
|
|
|
|
reply_data = future.get_data() |
|
|
|
|
|
print("~~~~~~Reply~~~~~~") |
|
print(reply_data) |
|
|
|
|
|
print("##########VECTOR STORE DEMO###############") |
|
|
|
proxy_flow_vs.send_message_async(input_message_write) |
|
|
|
future = proxy_flow_vs.send_message_blocking(input_message_read) |
|
|
|
|
|
|
|
reply_data = future.get_data() |
|
|
|
|
|
print("~~~~~~Reply~~~~~~") |
|
print(reply_data) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|