Spaces:
Runtime error
Runtime error
File size: 6,993 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 |
from pydantic import BaseSettings
from typing import Optional, List, Any, Dict, TypeVar, Set, cast, Iterable, Type
from typing_extensions import Literal
from abc import ABC
import importlib
import logging
from overrides import EnforceOverrides, override
from graphlib import TopologicalSorter
import inspect
# The thin client will have a flag to control which implementations to use
is_thin_client = False
try:
from chromadb.is_thin_client import is_thin_client # type: ignore
except ImportError:
is_thin_client = False
logger = logging.getLogger(__name__)
_legacy_config_values = {
"duckdb": "chromadb.db.duckdb.DuckDB",
"duckdb+parquet": "chromadb.db.duckdb.PersistentDuckDB",
"clickhouse": "chromadb.db.clickhouse.Clickhouse",
"rest": "chromadb.api.fastapi.FastAPI",
"local": "chromadb.api.local.LocalAPI",
}
# TODO: Don't use concrete types here to avoid circular deps. Strings are fine for right here!
_abstract_type_keys: Dict[str, str] = {
"chromadb.db.DB": "chroma_db_impl",
"chromadb.api.API": "chroma_api_impl",
"chromadb.telemetry.Telemetry": "chroma_telemetry_impl",
"chromadb.ingest.Producer": "chroma_producer_impl",
"chromadb.ingest.Consumer": "chroma_consumer_impl",
}
class Settings(BaseSettings):
environment: str = ""
chroma_db_impl: str = "chromadb.db.duckdb.DuckDB"
chroma_api_impl: str = "chromadb.api.local.LocalAPI"
chroma_telemetry_impl: str = "chromadb.telemetry.posthog.Posthog"
# New architecture components
chroma_sysdb_impl: str = "chromadb.db.impl.sqlite.SqliteDB"
chroma_producer_impl: str = "chromadb.db.impl.sqlite.SqliteDB"
chroma_consumer_impl: str = "chromadb.db.impl.sqlite.SqliteDB"
clickhouse_host: Optional[str] = None
clickhouse_port: Optional[str] = None
persist_directory: str = ".chroma"
chroma_server_host: Optional[str] = None
chroma_server_http_port: Optional[str] = None
chroma_server_ssl_enabled: Optional[bool] = False
chroma_server_grpc_port: Optional[str] = None
chroma_server_cors_allow_origins: List[str] = [] # eg ["http://localhost:3000"]
anonymized_telemetry: bool = True
allow_reset: bool = False
sqlite_database: Optional[str] = ":memory:"
migrations: Literal["none", "validate", "apply"] = "apply"
def require(self, key: str) -> Any:
"""Return the value of a required config key, or raise an exception if it is not
set"""
val = self[key]
if val is None:
raise ValueError(f"Missing required config value '{key}'")
return val
def __getitem__(self, key: str) -> Any:
val = getattr(self, key)
# Backwards compatibility with short names instead of full class names
if val in _legacy_config_values:
newval = _legacy_config_values[val]
val = newval
return val
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
T = TypeVar("T", bound="Component")
class Component(ABC, EnforceOverrides):
_dependencies: Set["Component"]
_system: "System"
_running: bool
def __init__(self, system: "System"):
self._dependencies = set()
self._system = system
self._running = False
def require(self, type: Type[T]) -> T:
"""Get a Component instance of the given type, and register as a dependency of
that instance."""
inst = self._system.instance(type)
self._dependencies.add(inst)
return inst
def dependencies(self) -> Set["Component"]:
"""Return the full set of components this component depends on."""
return self._dependencies
def stop(self) -> None:
"""Idempotently stop this component's execution and free all associated
resources."""
self._running = False
def start(self) -> None:
"""Idempotently start this component's execution"""
self._running = True
def reset(self) -> None:
"""Reset this component's state to its initial blank state. Only intended to be
called from tests."""
pass
class System(Component):
settings: Settings
_instances: Dict[Type[Component], Component]
def __init__(self, settings: Settings):
self.settings = settings
self._instances = {}
super().__init__(self)
if is_thin_client:
# The thin client is a system with only the API component
if self.settings["chroma_api_impl"] != "chromadb.api.fastapi.FastAPI":
raise RuntimeError(
"Chroma is running in http-only client mode, and can only be run with 'chromadb.api.fastapi.FastAPI' or 'rest' as the chroma_api_impl. \
see https://docs.trychroma.com/usage-guide?lang=py#using-the-python-http-only-client for more information."
)
def instance(self, type: Type[T]) -> T:
"""Return an instance of the component type specified. If the system is running,
the component will be started as well."""
if inspect.isabstract(type):
type_fqn = get_fqn(type)
if type_fqn not in _abstract_type_keys:
raise ValueError(f"Cannot instantiate abstract type: {type}")
key = _abstract_type_keys[type_fqn]
fqn = self.settings.require(key)
type = get_class(fqn, type)
if type not in self._instances:
impl = type(self)
self._instances[type] = impl
if self._running:
impl.start()
inst = self._instances[type]
return cast(T, inst)
def components(self) -> Iterable[Component]:
"""Return the full set of all components and their dependencies in dependency
order."""
sorter: TopologicalSorter[Component] = TopologicalSorter()
for component in self._instances.values():
sorter.add(component, *component.dependencies())
return sorter.static_order()
@override
def start(self) -> None:
super().start()
for component in self.components():
component.start()
@override
def stop(self) -> None:
super().stop()
for component in reversed(list(self.components())):
component.stop()
@override
def reset(self) -> None:
if not self.settings.allow_reset:
raise ValueError("Resetting is not allowed by this configuration")
for component in self.components():
component.reset()
C = TypeVar("C")
def get_class(fqn: str, type: Type[C]) -> Type[C]:
"""Given a fully qualifed class name, import the module and return the class"""
module_name, class_name = fqn.rsplit(".", 1)
module = importlib.import_module(module_name)
cls = getattr(module, class_name)
return cast(Type[C], cls)
def get_fqn(cls: Type[object]) -> str:
"""Given a class, return its fully qualified name"""
return f"{cls.__module__}.{cls.__name__}"
|