Spaces:
Runtime error
Runtime error
from sqlalchemy.engine.default import DefaultDialect | |
from clickhouse_connect import dbapi | |
from clickhouse_connect.cc_sqlalchemy.inspector import ChInspector | |
from clickhouse_connect.cc_sqlalchemy.sql import full_table | |
from clickhouse_connect.cc_sqlalchemy.sql.ddlcompiler import ChDDLCompiler | |
from clickhouse_connect.cc_sqlalchemy import ischema_names, dialect_name | |
from clickhouse_connect.cc_sqlalchemy.sql.preparer import ChIdentifierPreparer | |
from clickhouse_connect.driver.query import quote_identifier, format_str | |
# pylint: disable=too-many-public-methods,no-self-use,unused-argument | |
class ClickHouseDialect(DefaultDialect): | |
""" | |
See :py:class:`sqlalchemy.engine.interfaces` | |
""" | |
name = dialect_name | |
driver = 'connect' | |
default_schema_name = 'default' | |
supports_native_decimal = True | |
supports_native_boolean = True | |
returns_unicode_strings = True | |
postfetch_lastrowid = False | |
ddl_compiler = ChDDLCompiler | |
preparer = ChIdentifierPreparer | |
description_encoding = None | |
max_identifier_length = 127 | |
ischema_names = ischema_names | |
inspector = ChInspector | |
# pylint: disable=method-hidden | |
def dbapi(cls): | |
return dbapi | |
def initialize(self, connection): | |
pass | |
def get_schema_names(connection, **_): | |
return [row.name for row in connection.execute('SHOW DATABASES')] | |
def has_database(connection, db_name): | |
return (connection.execute('SELECT name FROM system.databases ' + | |
f'WHERE name = {format_str(db_name)}')).rowcount > 0 | |
def get_table_names(self, connection, schema=None, **kw): | |
cmd = 'SHOW TABLES' | |
if schema: | |
cmd += ' FROM ' + quote_identifier(schema) | |
return [row.name for row in connection.execute(cmd)] | |
def get_primary_keys(self, connection, table_name, schema=None, **kw): | |
return [] | |
# pylint: disable=arguments-renamed | |
def get_pk_constraint(self, connection, table_name, schema=None, **kw): | |
return [] | |
def get_foreign_keys(self, connection, table_name, schema=None, **kw): | |
return [] | |
def get_temp_table_names(self, connection, schema=None, **kw): | |
return [] | |
def get_view_names(self, connection, schema=None, **kw): | |
return [] | |
def get_temp_view_names(self, connection, schema=None, **kw): | |
return [] | |
def get_view_definition(self, connection, view_name, schema=None, **kw): | |
pass | |
def get_indexes(self, connection, table_name, schema=None, **kw): | |
return [] | |
def get_unique_constraints(self, connection, table_name, schema=None, **kw): | |
return [] | |
def get_check_constraints(self, connection, table_name, schema=None, **kw): | |
return [] | |
def has_table(self, connection, table_name, schema=None, **_kw): | |
result = connection.execute(f'EXISTS TABLE {full_table(table_name, schema)}') | |
row = result.fetchone() | |
return row[0] == 1 | |
def has_sequence(self, connection, sequence_name, schema=None, **_kw): | |
return False | |
def do_begin_twophase(self, connection, xid): | |
raise NotImplementedError | |
def do_prepare_twophase(self, connection, xid): | |
raise NotImplementedError | |
def do_rollback_twophase(self, connection, xid, is_prepared=True, recover=False): | |
raise NotImplementedError | |
def do_commit_twophase(self, connection, xid, is_prepared=True, recover=False): | |
raise NotImplementedError | |
def do_recover_twophase(self, connection): | |
raise NotImplementedError | |
def set_isolation_level(self, dbapi_conn, level): | |
pass | |
def get_isolation_level(self, dbapi_conn): | |
return None | |