nbaldwin commited on
Commit
4f9991d
1 Parent(s): fcfe6cc

coflows compatible

Browse files
Files changed (3) hide show
  1. demo.yaml +4 -5
  2. fixed_reply.py +9 -8
  3. run.py +78 -33
demo.yaml CHANGED
@@ -1,6 +1,5 @@
1
- flow:
2
- _target_: flow_modules.aiflows.FixedReplyFlowModule.FixedReplyFlow.instantiate_from_default_config
3
- name: "FixedReplyFlow"
4
- description: "A demo of the FixedReplyFlow."
5
- fixed_reply: "This is a fixed reply."
6
 
 
1
+ _target_: flow_modules.aiflows.FixedReplyFlowModule.FixedReplyFlow.instantiate_from_default_config
2
+ name: "FixedReplyFlow"
3
+ description: "A demo of the FixedReplyFlow."
4
+ fixed_reply: "This is a fixed reply."
 
5
 
fixed_reply.py CHANGED
@@ -1,7 +1,7 @@
1
  from typing import Dict, Any
2
 
3
  from aiflows.base_flows import AtomicFlow
4
-
5
 
6
  class FixedReplyFlow(AtomicFlow):
7
  """ This class implements a FixedReplyFlow. It's used to reply with a fixed reply.
@@ -38,14 +38,15 @@ class FixedReplyFlow(AtomicFlow):
38
  super().__init__(**kwargs)
39
 
40
  def run(self,
41
- input_data: Dict[str, Any]) -> Dict[str, Any]:
42
  """ Runs the FixedReplyFlow. It's used to reply with a fixed reply.
43
 
44
- :param input_data: The input data dictionary
45
- :type input_data: Dict[str, Any]
46
- :return: The fixed reply
47
- :rtype: Dict[str, Any]
48
  """
49
-
50
- return {"fixed_reply": self.flow_config["fixed_reply"]}
 
 
 
51
 
 
1
  from typing import Dict, Any
2
 
3
  from aiflows.base_flows import AtomicFlow
4
+ from aiflows.messages import FlowMessage
5
 
6
  class FixedReplyFlow(AtomicFlow):
7
  """ This class implements a FixedReplyFlow. It's used to reply with a fixed reply.
 
38
  super().__init__(**kwargs)
39
 
40
  def run(self,
41
+ input_message: FlowMessage):
42
  """ Runs the FixedReplyFlow. It's used to reply with a fixed reply.
43
 
44
+ :param input_message: The input message
45
+ :type input_message: FlowMessage
 
 
46
  """
47
+ reply = self._package_output_message(
48
+ input_message=input_message,
49
+ response={"fixed_reply": self.flow_config["fixed_reply"]}
50
+ )
51
+ self.reply_to_message(reply=reply, to=input_message)
52
 
run.py CHANGED
@@ -1,58 +1,103 @@
 
 
1
  import os
2
 
3
  import hydra
4
 
 
5
  from aiflows.flow_launchers import FlowLauncher
6
- from aiflows.utils.general_helpers import read_yaml_file
 
7
 
8
  from aiflows import logging
9
- from aiflows.flow_cache import CACHING_PARAMETERS
 
 
 
 
 
 
 
10
 
11
- CACHING_PARAMETERS.do_caching = False # Set to True to enable caching
12
  # clear_cache() # Uncomment this line to clear the cache
13
 
14
  logging.set_verbosity_debug()
15
- logging.auto_set_dir()
16
 
17
  dependencies = [
18
- {"url": "aiflows/FixedReplyFlowModule", "revision": os.getcwd()},
19
  ]
 
20
  from aiflows import flow_verse
21
  flow_verse.sync_dependencies(dependencies)
22
-
23
  if __name__ == "__main__":
 
 
 
 
 
24
 
 
 
25
  root_dir = "."
26
  cfg_path = os.path.join(root_dir, "demo.yaml")
27
  cfg = read_yaml_file(cfg_path)
28
- flow_with_interfaces = {
29
- "flow": hydra.utils.instantiate(cfg['flow'], _recursive_=False, _convert_="partial"),
30
- "input_interface": (
31
- None
32
- if getattr(cfg, "input_interface", None) is None
33
- else hydra.utils.instantiate(cfg['input_interface'], _recursive_=False)
34
- ),
35
- "output_interface": (
36
- None
37
- if getattr(cfg, "output_interface", None) is None
38
- else hydra.utils.instantiate(cfg['output_interface'], _recursive_=False)
39
- ),
40
- }
41
-
42
- # ~~~ Get the data ~~~
43
- # This can be a list of samples
44
- data = {"id": 0} # Add your data here
45
-
46
- # ~~~ Run inference ~~~
47
- path_to_output_file = None
48
- # path_to_output_file = "output.jsonl" # Uncomment this line to save the output to disk
49
-
50
- _, outputs = FlowLauncher.launch(
51
- flow_with_interfaces=flow_with_interfaces,
 
 
 
 
 
 
52
  data=data,
53
- path_to_output_file=path_to_output_file,
54
  )
55
 
 
 
 
 
 
 
 
 
 
 
56
  # ~~~ Print the output ~~~
57
- flow_output_data = outputs[0]
58
- print(flow_output_data)
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ """A simple script to run a Flow that can be used for development and debugging."""
2
+
3
  import os
4
 
5
  import hydra
6
 
7
+ import aiflows
8
  from aiflows.flow_launchers import FlowLauncher
9
+ from aiflows.backends.api_info import ApiInfo
10
+ from aiflows.utils.general_helpers import read_yaml_file, quick_load_api_keys
11
 
12
  from aiflows import logging
13
+ from aiflows.flow_cache import CACHING_PARAMETERS, clear_cache
14
+
15
+ from aiflows.utils import serve_utils
16
+ from aiflows.workers import run_dispatch_worker_thread
17
+ from aiflows.messages import FlowMessage
18
+ from aiflows.interfaces import KeyInterface
19
+ from aiflows.utils.colink_utils import start_colink_server
20
+ from aiflows.workers import run_dispatch_worker_thread
21
 
22
+ CACHING_PARAMETERS.do_caching = False # Set to True in order to disable caching
23
  # clear_cache() # Uncomment this line to clear the cache
24
 
25
  logging.set_verbosity_debug()
26
+
27
 
28
  dependencies = [
29
+ {"url": "aiflows/FixedReplyFlowModule", "revision": os.getcwd()}
30
  ]
31
+
32
  from aiflows import flow_verse
33
  flow_verse.sync_dependencies(dependencies)
 
34
  if __name__ == "__main__":
35
+
36
+ #1. ~~~~~ Set up a colink server ~~~~
37
+ FLOW_MODULES_PATH = "./"
38
+
39
+ cl = start_colink_server()
40
 
41
+
42
+ #2. ~~~~~Load flow config~~~~~~
43
  root_dir = "."
44
  cfg_path = os.path.join(root_dir, "demo.yaml")
45
  cfg = read_yaml_file(cfg_path)
46
+
47
+
48
+ #3. ~~~~ Serve The Flow ~~~~
49
+ serve_utils.recursive_serve_flow(
50
+ cl = cl,
51
+ flow_type="FixedReplyFlowModule",
52
+ default_config=cfg,
53
+ default_state=None,
54
+ default_dispatch_point="coflows_dispatch"
55
+ )
56
+
57
+ #4. ~~~~~Start A Worker Thread~~~~~
58
+ run_dispatch_worker_thread(cl, dispatch_point="coflows_dispatch", flow_modules_base_path=FLOW_MODULES_PATH)
59
+
60
+ #5. ~~~~~Mount the flow and get its proxy~~~~~~
61
+ proxy_flow = serve_utils.recursive_mount(
62
+ cl=cl,
63
+ client_id="local",
64
+ flow_type="FixedReplyFlowModule",
65
+ config_overrides=None,
66
+ initial_state=None,
67
+ dispatch_point_override=None,
68
+ )
69
+
70
+ #6. ~~~ Get the data ~~~
71
+ data = {"id": 0}
72
+
73
+
74
+ #option1: use the FlowMessage class
75
+ input_message = FlowMessage(
76
  data=data,
 
77
  )
78
 
79
+ #option2: use the proxy_flow
80
+ #input_message = proxy_flow._package_input_message(data = data)
81
+
82
+ #7. ~~~ Run inference ~~~
83
+ future = proxy_flow.send_message_blocking(input_message)
84
+
85
+ #uncomment this line if you would like to get the full message back
86
+ #reply_message = future.get_message()
87
+ reply_data = future.get_data()
88
+
89
  # ~~~ Print the output ~~~
90
+ print("~~~~~~Reply~~~~~~")
91
+ print(reply_data)
92
+
93
+
94
+ #8. ~~~~ (Optional) apply output interface on reply ~~~~
95
+ # output_interface = KeyInterface(
96
+ # keys_to_rename={"api_output": "answer"},
97
+ # )
98
+ # print("Output: ", output_interface(reply_data))
99
+
100
+
101
+ #9. ~~~~~Optional: Unserve Flow~~~~~~
102
+ # serve_utils.delete_served_flow(cl, "FlowModule")
103
+