BrickTracker/bricktracker/record.py

91 lines
2.1 KiB
Python
Raw Normal View History

2025-01-17 11:03:00 +01:00
from sqlite3 import Row
from typing import Any, ItemsView, Tuple
2025-01-17 11:03:00 +01:00
from .fields import BrickRecordFields
from .sql import BrickSQL
# SQLite record
class BrickRecord(object):
select_query: str
insert_query: str
# Fields
fields: BrickRecordFields
def __init__(self, /):
self.fields = BrickRecordFields()
# Load from a record
def ingest(self, record: Row | dict[str, Any], /) -> None:
# Brutally ingest the record
for key in record.keys():
setattr(self.fields, key, record[key])
# Insert into the database
# If we do not commit immediately, we defer the execute() call
def insert(
self,
/,
*,
commit=True,
no_defer=False,
override_query: str | None = None
) -> Tuple[int, str]:
if override_query:
query = override_query
else:
query = self.insert_query
2025-01-17 11:03:00 +01:00
database = BrickSQL()
rows, q = database.execute(
query,
2025-01-17 11:03:00 +01:00
parameters=self.sql_parameters(),
defer=not commit and not no_defer,
2025-01-17 11:03:00 +01:00
)
if commit:
database.commit()
return rows, q
2025-01-17 11:03:00 +01:00
# Shorthand to field items
def items(self, /) -> ItemsView[str, Any]:
return self.fields.__dict__.items()
# Get from the database using the query
def select(
self,
/,
*,
override_query: str | None = None,
**context: Any
) -> bool:
2025-01-17 11:03:00 +01:00
if override_query:
query = override_query
else:
query = self.select_query
record = BrickSQL().fetchone(
2025-01-17 11:03:00 +01:00
query,
parameters=self.sql_parameters(),
**context
2025-01-17 11:03:00 +01:00
)
# Ingest the record
if record is not None:
self.ingest(record)
return True
else:
return False
2025-01-17 11:03:00 +01:00
# Generic SQL parameters from fields
def sql_parameters(self, /) -> dict[str, Any]:
parameters: dict[str, Any] = {}
for name, value in self.items():
parameters[name] = value
return parameters