Spaces:
Runtime error
Runtime error
File size: 3,214 Bytes
4a51346 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 |
from abc import abstractmethod
from dataclasses import asdict, dataclass
import os
from typing import Callable, ClassVar, Dict, Any
import uuid
import time
from threading import Event, Thread
import chromadb
from chromadb.config import Component
from pathlib import Path
from enum import Enum
TELEMETRY_WHITELISTED_SETTINGS = [
"chroma_db_impl",
"chroma_api_impl",
"chroma_server_ssl_enabled",
]
class ServerContext(Enum):
NONE = "None"
FASTAPI = "FastAPI"
@dataclass
class TelemetryEvent:
name: ClassVar[str]
@property
def properties(self) -> Dict[str, Any]:
return asdict(self)
class RepeatedTelemetry:
def __init__(self, interval: int, function: Callable[[], None]):
self.interval = interval
self.function = function
self.start = time.time()
self.event = Event()
self.thread = Thread(target=self._target)
self.thread.daemon = True
self.thread.start()
def _target(self) -> None:
while not self.event.wait(self._time):
self.function()
@property
def _time(self) -> float:
return self.interval - ((time.time() - self.start) % self.interval)
def stop(self) -> None:
self.event.set()
self.thread.join()
class Telemetry(Component):
USER_ID_PATH = str(Path.home() / ".cache" / "chroma" / "telemetry_user_id")
UNKNOWN_USER_ID = "UNKNOWN"
SERVER_CONTEXT: ServerContext = ServerContext.NONE
_curr_user_id = None
@abstractmethod
def capture(self, event: TelemetryEvent) -> None:
pass
# Schedule a function that creates a TelemetryEvent to be called every `every_seconds` seconds.
def schedule_event_function(
self, event_function: Callable[..., TelemetryEvent], every_seconds: int
) -> None:
RepeatedTelemetry(every_seconds, lambda: self.capture(event_function()))
@property
def context(self) -> Dict[str, Any]:
chroma_version = chromadb.__version__
settings = chromadb.get_settings()
telemetry_settings = {}
for whitelisted in TELEMETRY_WHITELISTED_SETTINGS:
telemetry_settings[whitelisted] = settings[whitelisted]
self._context = {
"chroma_version": chroma_version,
"server_context": self.SERVER_CONTEXT.value,
**telemetry_settings,
}
return self._context
@property
def user_id(self) -> str:
if self._curr_user_id:
return self._curr_user_id
# File access may fail due to permissions or other reasons. We don't want to crash so we catch all exceptions.
try:
if not os.path.exists(self.USER_ID_PATH):
os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
with open(self.USER_ID_PATH, "w") as f:
new_user_id = str(uuid.uuid4())
f.write(new_user_id)
self._curr_user_id = new_user_id
else:
with open(self.USER_ID_PATH, "r") as f:
self._curr_user_id = f.read()
except Exception:
self._curr_user_id = self.UNKNOWN_USER_ID
return self._curr_user_id
|