Use constats for SQL g. variables to avoid any typo

This commit is contained in:
Gregoo 2025-01-21 11:27:27 +01:00
parent a6ab53efa7
commit 14bc9cef26

View File

@ -15,6 +15,11 @@ from .version import __database_version__
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
G_CONNECTION: Final[str] = 'database_connection'
G_ENVIRONMENT: Final[str] = 'database_environment'
G_DEFER: Final[str] = 'database_defer'
G_STATS: Final[str] = 'database_stats'
COUNTERS: Final[list[BrickCounter]] = [ COUNTERS: Final[list[BrickCounter]] = [
BrickCounter('Sets', 'sets', icon='hashtag'), BrickCounter('Sets', 'sets', icon='hashtag'),
BrickCounter('Minifigures', 'minifigures', icon='group-line'), BrickCounter('Minifigures', 'minifigures', icon='group-line'),
@ -34,12 +39,12 @@ class BrickSQL(object):
# Instantiate the database connection in the Flask # Instantiate the database connection in the Flask
# application context so that it can be used by all # application context so that it can be used by all
# requests without re-opening connections # requests without re-opening connections
database = getattr(g, 'database', None) connection = getattr(g, G_CONNECTION, None)
# Grab the existing connection if it exists # Grab the existing connection if it exists
if database is not None: if connection is not None:
self.connection = database self.connection = connection
self.stats = getattr(g, 'database_stats', BrickSQLStats()) self.stats = getattr(g, G_STATS, BrickSQLStats())
# Grab a cursor # Grab a cursor
self.cursor = self.connection.cursor() self.cursor = self.connection.cursor()
@ -76,6 +81,14 @@ class BrickSQL(object):
error=str(e) error=str(e)
)) ))
# Debug: Attach the debugger
# Uncomment manually because this is ultra verbose
# self.connection.set_trace_callback(print)
# Save the connection globally for later use
setattr(g, G_CONNECTION, self.connection)
setattr(g, G_STATS, self.stats)
if not failsafe: if not failsafe:
if self.needs_upgrade(): if self.needs_upgrade():
raise DatabaseException('Your database need to be upgraded from version {version} to version {required}'.format( # noqa: E501 raise DatabaseException('Your database need to be upgraded from version {version} to version {required}'.format( # noqa: E501
@ -83,17 +96,9 @@ class BrickSQL(object):
required=__database_version__, required=__database_version__,
)) ))
# Debug: Attach the debugger
# Uncomment manually because this is ultra verbose
# self.connection.set_trace_callback(print)
# Save the connection globally for later use
g.database = self.connection
g.database_stats = self.stats
# Clear the defer stack # Clear the defer stack
def clear_defer(self, /) -> None: def clear_defer(self, /) -> None:
g.database_defer = [] setattr(g, G_DEFER, [])
# Shorthand to commit # Shorthand to commit
def commit(self, /) -> None: def commit(self, /) -> None:
@ -129,7 +134,7 @@ class BrickSQL(object):
defer.append((query, parameters)) defer.append((query, parameters))
# Save the defer stack # Save the defer stack
g.database_defer = defer setattr(g, G_DEFER, defer)
# Shorthand to execute, returning number of affected rows # Shorthand to execute, returning number of affected rows
def execute( def execute(
@ -229,18 +234,14 @@ class BrickSQL(object):
# Grab the defer stack # Grab the defer stack
def get_defer(self, /) -> list[Tuple[str, dict[str, Any]]]: def get_defer(self, /) -> list[Tuple[str, dict[str, Any]]]:
defer: list[Tuple[str, dict[str, Any]]] = getattr( defer: list[Tuple[str, dict[str, Any]]] = getattr(g, G_DEFER, [])
g,
'database_defer',
[]
)
return defer return defer
# Load a query by name # Load a query by name
def load_query(self, name: str, /, **context) -> str: def load_query(self, name: str, /, **context) -> str:
# Grab the existing environment if it exists # Grab the existing environment if it exists
environment = getattr(g, 'database_loader', None) environment = getattr(g, G_ENVIRONMENT, None)
# Instantiate Jinja environment for SQL files # Instantiate Jinja environment for SQL files
if environment is None: if environment is None:
@ -251,7 +252,7 @@ class BrickSQL(object):
) )
# Save the environment globally for later use # Save the environment globally for later use
g.database_environment = environment setattr(g, G_ENVIRONMENT, environment)
# Grab the template # Grab the template
logger.debug('SQLite: loading {name} (context: {context})'.format( logger.debug('SQLite: loading {name} (context: {context})'.format(
@ -346,11 +347,11 @@ class BrickSQL(object):
# Close all existing SQLite3 connections # Close all existing SQLite3 connections
def close() -> None: def close() -> None:
database: sqlite3.Connection | None = getattr(g, 'database', None) connection: sqlite3.Connection | None = getattr(g, G_CONNECTION, None)
if database is not None: if connection is not None:
logger.debug('SQLite3: close') logger.debug('SQLite3: close')
database.close() connection.close()
# Remove the database from the context # Remove the database from the context
delattr(g, 'database') delattr(g, G_CONNECTION)