317 lines
8.5 KiB
Python
317 lines
8.5 KiB
Python
import logging
|
|
import os
|
|
import sqlite3
|
|
from typing import Any, Tuple
|
|
|
|
from .sql_stats import BrickSQLStats
|
|
|
|
from flask import current_app, g
|
|
from jinja2 import Environment, FileSystemLoader
|
|
from werkzeug.datastructures import FileStorage
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# SQLite3 client with our extra features
|
|
class BrickSQL(object):
|
|
connection: sqlite3.Connection
|
|
cursor: sqlite3.Cursor
|
|
stats: BrickSQLStats
|
|
|
|
def __init__(self, /):
|
|
# Instantiate the database connection in the Flask
|
|
# application context so that it can be used by all
|
|
# requests without re-opening connections
|
|
database = getattr(g, 'database', None)
|
|
|
|
# Grab the existing connection if it exists
|
|
if database is not None:
|
|
self.connection = database
|
|
self.stats = getattr(g, 'database_stats', BrickSQLStats())
|
|
else:
|
|
# Instantiate the stats
|
|
self.stats = BrickSQLStats()
|
|
|
|
# Stats: connect
|
|
self.stats.connect += 1
|
|
|
|
logger.debug('SQLite3: connect')
|
|
self.connection = sqlite3.connect(
|
|
current_app.config['DATABASE_PATH'].value
|
|
)
|
|
|
|
# Setup the row factory to get pseudo-dicts rather than tuples
|
|
self.connection.row_factory = sqlite3.Row
|
|
|
|
# Debug: Attach the debugger
|
|
# Uncomment manually because this is ultra verbose
|
|
# self.connection.set_trace_callback(print)
|
|
|
|
# Save the connection globally for later use
|
|
g.database = self.connection
|
|
g.database_stats = self.stats
|
|
|
|
# Grab a cursor
|
|
self.cursor = self.connection.cursor()
|
|
|
|
# Clear the defer stack
|
|
def clear_defer(self, /) -> None:
|
|
g.database_defer = []
|
|
|
|
# Shorthand to commit
|
|
def commit(self, /) -> None:
|
|
# Stats: commit
|
|
self.stats.commit += 1
|
|
|
|
# Process the defered stack
|
|
for item in self.get_defer():
|
|
self.raw_execute(item[0], item[1])
|
|
|
|
self.clear_defer()
|
|
|
|
logger.debug('SQLite3: commit')
|
|
return self.connection.commit()
|
|
|
|
# Defer a call to execute
|
|
def defer(self, query: str, parameters: dict[str, Any], /):
|
|
defer = self.get_defer()
|
|
|
|
logger.debug('SQLite3: defer execute')
|
|
|
|
# Add the query and parameters to the defer stack
|
|
defer.append((query, parameters))
|
|
|
|
# Save the defer stack
|
|
g.database_defer = defer
|
|
|
|
# Shorthand to execute, returning number of affected rows
|
|
def execute(
|
|
self,
|
|
query: str,
|
|
/,
|
|
parameters: dict[str, Any] = {},
|
|
defer: bool = False,
|
|
**context,
|
|
) -> Tuple[int, str]:
|
|
# Stats: execute
|
|
self.stats.execute += 1
|
|
|
|
# Load the query
|
|
query = self.load_query(query, **context)
|
|
|
|
# Defer
|
|
if defer:
|
|
self.defer(query, parameters)
|
|
|
|
return -1, query
|
|
else:
|
|
result = self.raw_execute(query, parameters)
|
|
|
|
# Stats: changed
|
|
if result.rowcount > 0:
|
|
self.stats.changed += result.rowcount
|
|
|
|
return result.rowcount, query
|
|
|
|
# Shorthand to executescript
|
|
def executescript(self, query: str, /, **context) -> None:
|
|
# Load the query
|
|
query = self.load_query(query, **context)
|
|
|
|
# Stats: executescript
|
|
self.stats.executescript += 1
|
|
|
|
logger.debug('SQLite3: executescript')
|
|
self.cursor.executescript(query)
|
|
|
|
# Shorthand to execute and commit
|
|
def execute_and_commit(
|
|
self,
|
|
query: str,
|
|
/,
|
|
parameters: dict[str, Any] = {},
|
|
**context,
|
|
) -> Tuple[int, str]:
|
|
rows, query = self.execute(query, parameters=parameters, **context)
|
|
self.commit()
|
|
|
|
return rows, query
|
|
|
|
# Shorthand to execute and fetchall
|
|
def fetchall(
|
|
self,
|
|
query: str,
|
|
/,
|
|
parameters: dict[str, Any] = {},
|
|
**context,
|
|
) -> list[sqlite3.Row]:
|
|
_, query = self.execute(query, parameters=parameters, **context)
|
|
|
|
# Stats: fetchall
|
|
self.stats.fetchall += 1
|
|
|
|
logger.debug('SQLite3: fetchall: {query}'.format(
|
|
query=BrickSQL.clean_query(query)
|
|
))
|
|
records = self.cursor.fetchall()
|
|
|
|
# Stats: fetched
|
|
self.stats.fetched += len(records)
|
|
|
|
return records
|
|
|
|
# Shorthand to execute and fetchone
|
|
def fetchone(
|
|
self,
|
|
query: str,
|
|
/,
|
|
parameters: dict[str, Any] = {},
|
|
**context,
|
|
) -> sqlite3.Row | None:
|
|
_, query = self.execute(query, parameters=parameters, **context)
|
|
|
|
# Stats: fetchone
|
|
self.stats.fetchone += 1
|
|
|
|
logger.debug('SQLite3: fetchone: {query}'.format(
|
|
query=BrickSQL.clean_query(query)
|
|
))
|
|
record = self.cursor.fetchone()
|
|
|
|
# Stats: fetched
|
|
if record is not None:
|
|
self.stats.fetched += len(record)
|
|
|
|
return record
|
|
|
|
# Grab the defer stack
|
|
def get_defer(self, /) -> list[Tuple[str, dict[str, Any]]]:
|
|
defer: list[Tuple[str, dict[str, Any]]] = getattr(
|
|
g,
|
|
'database_defer',
|
|
[]
|
|
)
|
|
|
|
return defer
|
|
|
|
# Load a query by name
|
|
def load_query(self, name: str, /, **context) -> str:
|
|
# Grab the existing environment if it exists
|
|
environment = getattr(g, 'database_loader', None)
|
|
|
|
# Instantiate Jinja environment for SQL files
|
|
if environment is None:
|
|
environment = Environment(
|
|
loader=FileSystemLoader(
|
|
os.path.join(os.path.dirname(__file__), 'sql/')
|
|
)
|
|
)
|
|
|
|
# Save the environment globally for later use
|
|
g.database_environment = environment
|
|
|
|
# Grab the template
|
|
logger.debug('SQLite: loading {name} (context: {context})'.format(
|
|
name=name,
|
|
context=context,
|
|
))
|
|
template = environment.get_template('{name}.sql'.format(
|
|
name=name,
|
|
))
|
|
|
|
return template.render(**context)
|
|
|
|
# Raw execute the query without any options
|
|
def raw_execute(
|
|
self,
|
|
query: str,
|
|
parameters: dict[str, Any]
|
|
) -> sqlite3.Cursor:
|
|
logger.debug('SQLite3: execute: {query}'.format(
|
|
query=BrickSQL.clean_query(query)
|
|
))
|
|
|
|
return self.cursor.execute(query, parameters)
|
|
|
|
# Clean the query for debugging
|
|
@staticmethod
|
|
def clean_query(query: str, /) -> str:
|
|
cleaned: list[str] = []
|
|
|
|
for line in query.splitlines():
|
|
# Keep the non-comment side
|
|
line, sep, comment = line.partition('--')
|
|
|
|
# Clean the non-comment side
|
|
line = line.strip()
|
|
|
|
if line:
|
|
cleaned.append(line)
|
|
|
|
return ' '.join(cleaned)
|
|
|
|
# Delete the database
|
|
@staticmethod
|
|
def delete() -> None:
|
|
os.remove(current_app.config['DATABASE_PATH'].value)
|
|
|
|
# Info
|
|
logger.info('The database has been deleted')
|
|
|
|
# Drop the database
|
|
@staticmethod
|
|
def drop() -> None:
|
|
BrickSQL().executescript('schema/drop')
|
|
|
|
# Info
|
|
logger.info('The database has been dropped')
|
|
|
|
# Count the database records
|
|
@staticmethod
|
|
def count_records() -> dict[str, int]:
|
|
database = BrickSQL()
|
|
|
|
counters: dict[str, int] = {}
|
|
for table in ['sets', 'minifigures', 'inventory', 'missing']:
|
|
record = database.fetchone('schema/count', table=table)
|
|
|
|
if record is not None:
|
|
counters[table] = record['count']
|
|
|
|
return counters
|
|
|
|
# Initialize the database
|
|
@staticmethod
|
|
def initialize() -> None:
|
|
BrickSQL().executescript('migrations/init')
|
|
|
|
# Info
|
|
logger.info('The database has been initialized')
|
|
|
|
# Check if the database is initialized
|
|
@staticmethod
|
|
def is_init() -> bool:
|
|
return BrickSQL().fetchone('schema/is_init') is not None
|
|
|
|
# Replace the database with a new file
|
|
@staticmethod
|
|
def upload(file: FileStorage, /) -> None:
|
|
file.save(current_app.config['DATABASE_PATH'].value)
|
|
|
|
# Info
|
|
logger.info('The database has been imported using file {file}'.format(
|
|
file=file.filename
|
|
))
|
|
|
|
|
|
# Close all existing SQLite3 connections
|
|
def close() -> None:
|
|
database: sqlite3.Connection | None = getattr(g, 'database', None)
|
|
|
|
if database is not None:
|
|
logger.debug('SQLite3: close')
|
|
database.close()
|
|
|
|
# Remove the database from the context
|
|
delattr(g, 'database')
|