SungBeom's picture
Upload folder using huggingface_hub
4a51346
raw
history blame
3.21 kB
from abc import abstractmethod
from dataclasses import asdict, dataclass
import os
from typing import Callable, ClassVar, Dict, Any
import uuid
import time
from threading import Event, Thread
import chromadb
from chromadb.config import Component
from pathlib import Path
from enum import Enum
TELEMETRY_WHITELISTED_SETTINGS = [
"chroma_db_impl",
"chroma_api_impl",
"chroma_server_ssl_enabled",
]
class ServerContext(Enum):
NONE = "None"
FASTAPI = "FastAPI"
@dataclass
class TelemetryEvent:
name: ClassVar[str]
@property
def properties(self) -> Dict[str, Any]:
return asdict(self)
class RepeatedTelemetry:
def __init__(self, interval: int, function: Callable[[], None]):
self.interval = interval
self.function = function
self.start = time.time()
self.event = Event()
self.thread = Thread(target=self._target)
self.thread.daemon = True
self.thread.start()
def _target(self) -> None:
while not self.event.wait(self._time):
self.function()
@property
def _time(self) -> float:
return self.interval - ((time.time() - self.start) % self.interval)
def stop(self) -> None:
self.event.set()
self.thread.join()
class Telemetry(Component):
USER_ID_PATH = str(Path.home() / ".cache" / "chroma" / "telemetry_user_id")
UNKNOWN_USER_ID = "UNKNOWN"
SERVER_CONTEXT: ServerContext = ServerContext.NONE
_curr_user_id = None
@abstractmethod
def capture(self, event: TelemetryEvent) -> None:
pass
# Schedule a function that creates a TelemetryEvent to be called every `every_seconds` seconds.
def schedule_event_function(
self, event_function: Callable[..., TelemetryEvent], every_seconds: int
) -> None:
RepeatedTelemetry(every_seconds, lambda: self.capture(event_function()))
@property
def context(self) -> Dict[str, Any]:
chroma_version = chromadb.__version__
settings = chromadb.get_settings()
telemetry_settings = {}
for whitelisted in TELEMETRY_WHITELISTED_SETTINGS:
telemetry_settings[whitelisted] = settings[whitelisted]
self._context = {
"chroma_version": chroma_version,
"server_context": self.SERVER_CONTEXT.value,
**telemetry_settings,
}
return self._context
@property
def user_id(self) -> str:
if self._curr_user_id:
return self._curr_user_id
# File access may fail due to permissions or other reasons. We don't want to crash so we catch all exceptions.
try:
if not os.path.exists(self.USER_ID_PATH):
os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
with open(self.USER_ID_PATH, "w") as f:
new_user_id = str(uuid.uuid4())
f.write(new_user_id)
self._curr_user_id = new_user_id
else:
with open(self.USER_ID_PATH, "r") as f:
self._curr_user_id = f.read()
except Exception:
self._curr_user_id = self.UNKNOWN_USER_ID
return self._curr_user_id