File size: 6,004 Bytes
4a51346
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
from typing import Any, Optional, Sequence, Tuple, Type
from types import TracebackType
from typing_extensions import Protocol, Self, Literal
from abc import ABC, abstractmethod
from threading import local
from overrides import override, EnforceOverrides
import pypika
import pypika.queries
from chromadb.config import System, Component
from uuid import UUID
from itertools import islice, count


class NotFoundError(Exception):
    """Raised when a delete or update operation affects no rows"""

    pass


class UniqueConstraintError(Exception):
    """Raised when an insert operation would violate a unique constraint"""

    pass


class Cursor(Protocol):
    """Reifies methods we use from a DBAPI2 Cursor since DBAPI2 is not typed."""

    def execute(self, sql: str, params: Optional[Tuple[Any, ...]] = None) -> Self:
        ...

    def executescript(self, script: str) -> Self:
        ...

    def executemany(
        self, sql: str, params: Optional[Sequence[Tuple[Any, ...]]] = None
    ) -> Self:
        ...

    def fetchone(self) -> Tuple[Any, ...]:
        ...

    def fetchall(self) -> Sequence[Tuple[Any, ...]]:
        ...


class TxWrapper(ABC, EnforceOverrides):
    """Wrapper class for DBAPI 2.0 Connection objects, with which clients can implement transactions.
    Makes two guarantees that basic DBAPI 2.0 connections do not:

    - __enter__ returns a Cursor object consistently (instead of a Connection like some do)
    - Always re-raises an exception if one was thrown from the body
    """

    @abstractmethod
    def __enter__(self) -> Cursor:
        pass

    @abstractmethod
    def __exit__(
        self,
        exc_type: Optional[Type[BaseException]],
        exc_value: Optional[BaseException],
        traceback: Optional[TracebackType],
    ) -> Literal[False]:
        pass


class SqlDB(Component):
    """DBAPI 2.0 interface wrapper to ensure consistent behavior between implementations"""

    def __init__(self, system: System):
        super().__init__(system)

    @abstractmethod
    def tx(self) -> TxWrapper:
        """Return a transaction wrapper"""
        pass

    @staticmethod
    @abstractmethod
    def querybuilder() -> Type[pypika.Query]:
        """Return a PyPika Query builder of an appropriate subtype for this database
        implementation (see
        https://pypika.readthedocs.io/en/latest/3_advanced.html#handling-different-database-platforms)
        """
        pass

    @staticmethod
    @abstractmethod
    def parameter_format() -> str:
        """Return the appropriate parameter format for this database implementation.
        Will be called with str.format(i) where i is the numeric index of the parameter.
        """
        pass

    @staticmethod
    @abstractmethod
    def uuid_to_db(uuid: Optional[UUID]) -> Optional[Any]:
        """Convert a UUID to a value that can be passed to the DB driver"""
        pass

    @staticmethod
    @abstractmethod
    def uuid_from_db(value: Optional[Any]) -> Optional[UUID]:
        """Convert a value from the DB driver to a UUID"""
        pass

    @staticmethod
    @abstractmethod
    def unique_constraint_error() -> Type[BaseException]:
        """Return the exception type that the DB raises when a unique constraint is
        violated"""
        pass

    def param(self, idx: int) -> pypika.Parameter:
        """Return a PyPika Parameter object for the given index"""
        return pypika.Parameter(self.parameter_format().format(idx))


_context = local()


class ParameterValue(pypika.Parameter):  # type: ignore
    """
    Wrapper class for PyPika paramters that allows the values for Parameters
    to be expressed inline while building a query. See get_sql() for
    detailed usage information.
    """

    def __init__(self, value: Any):
        self.value = value

    @override
    def get_sql(self, **kwargs: Any) -> str:
        if isinstance(self.value, (list, tuple)):
            _context.values.extend(self.value)
            indexes = islice(_context.generator, len(self.value))
            placeholders = ", ".join(_context.formatstr.format(i) for i in indexes)
            val = f"({placeholders})"
        else:
            _context.values.append(self.value)
            val = _context.formatstr.format(next(_context.generator))

        return str(val)


def get_sql(
    query: pypika.queries.QueryBuilder, formatstr: str = "?"
) -> Tuple[str, Tuple[Any, ...]]:
    """
    Wrapper for pypika's get_sql method that allows the values for Parameters
    to be expressed inline while building a query, and that returns a tuple of the
    SQL string and parameters. This makes it easier to construct complex queries
    programmatically and automatically matches up the generated SQL with the required
    parameter vector.

    Doing so requires using the ParameterValue class defined in this module instead
    of the base pypika.Parameter class.

    Usage Example:

        q = (
            pypika.Query().from_("table")
            .select("col1")
            .where("col2"==ParameterValue("foo"))
            .where("col3"==ParameterValue("bar"))
        )

        sql, params = get_sql(q)

        cursor.execute(sql, params)

    Note how it is not necessary to construct the parameter vector manually... it
    will always be generated with the parameter values in the same order as emitted
    SQL string.

    The format string should match the parameter format for the database being used.
    It will be called with str.format(i) where i is the numeric index of the parameter.
    For example, Postgres requires parameters like `:1`, `:2`, etc. so the format string
    should be `":{}"`.

    See https://pypika.readthedocs.io/en/latest/2_tutorial.html#parametrized-queries for more
    information on parameterized queries in PyPika.
    """

    _context.values = []
    _context.generator = count(1)
    _context.formatstr = formatstr
    sql = query.get_sql()
    params = tuple(_context.values)
    return sql, params