File size: 3,214 Bytes
4a51346
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
from abc import abstractmethod
from dataclasses import asdict, dataclass
import os
from typing import Callable, ClassVar, Dict, Any
import uuid
import time
from threading import Event, Thread
import chromadb
from chromadb.config import Component
from pathlib import Path
from enum import Enum

TELEMETRY_WHITELISTED_SETTINGS = [
    "chroma_db_impl",
    "chroma_api_impl",
    "chroma_server_ssl_enabled",
]


class ServerContext(Enum):
    NONE = "None"
    FASTAPI = "FastAPI"


@dataclass
class TelemetryEvent:
    name: ClassVar[str]

    @property
    def properties(self) -> Dict[str, Any]:
        return asdict(self)


class RepeatedTelemetry:
    def __init__(self, interval: int, function: Callable[[], None]):
        self.interval = interval
        self.function = function
        self.start = time.time()
        self.event = Event()
        self.thread = Thread(target=self._target)
        self.thread.daemon = True
        self.thread.start()

    def _target(self) -> None:
        while not self.event.wait(self._time):
            self.function()

    @property
    def _time(self) -> float:
        return self.interval - ((time.time() - self.start) % self.interval)

    def stop(self) -> None:
        self.event.set()
        self.thread.join()


class Telemetry(Component):
    USER_ID_PATH = str(Path.home() / ".cache" / "chroma" / "telemetry_user_id")
    UNKNOWN_USER_ID = "UNKNOWN"
    SERVER_CONTEXT: ServerContext = ServerContext.NONE
    _curr_user_id = None

    @abstractmethod
    def capture(self, event: TelemetryEvent) -> None:
        pass

    # Schedule a function that creates a TelemetryEvent to be called every `every_seconds` seconds.
    def schedule_event_function(
        self, event_function: Callable[..., TelemetryEvent], every_seconds: int
    ) -> None:
        RepeatedTelemetry(every_seconds, lambda: self.capture(event_function()))

    @property
    def context(self) -> Dict[str, Any]:
        chroma_version = chromadb.__version__
        settings = chromadb.get_settings()
        telemetry_settings = {}
        for whitelisted in TELEMETRY_WHITELISTED_SETTINGS:
            telemetry_settings[whitelisted] = settings[whitelisted]

        self._context = {
            "chroma_version": chroma_version,
            "server_context": self.SERVER_CONTEXT.value,
            **telemetry_settings,
        }
        return self._context

    @property
    def user_id(self) -> str:
        if self._curr_user_id:
            return self._curr_user_id

        # File access may fail due to permissions or other reasons. We don't want to crash so we catch all exceptions.
        try:
            if not os.path.exists(self.USER_ID_PATH):
                os.makedirs(os.path.dirname(self.USER_ID_PATH), exist_ok=True)
                with open(self.USER_ID_PATH, "w") as f:
                    new_user_id = str(uuid.uuid4())
                    f.write(new_user_id)
                self._curr_user_id = new_user_id
            else:
                with open(self.USER_ID_PATH, "r") as f:
                    self._curr_user_id = f.read()
        except Exception:
            self._curr_user_id = self.UNKNOWN_USER_ID
        return self._curr_user_id