BrickTracker/bricktracker/record.py

89 lines
2.0 KiB
Python
Raw Normal View History

2025-01-17 11:03:00 +01:00
from sqlite3 import Row
2025-01-28 23:07:12 +01:00
from typing import Any, ItemsView
2025-01-17 11:03:00 +01:00
from .fields import BrickRecordFields
from .sql import BrickSQL
# SQLite record
class BrickRecord(object):
select_query: str
insert_query: str
# Fields
fields: BrickRecordFields
def __init__(self, /):
self.fields = BrickRecordFields()
# Load from a record
def ingest(self, record: Row | dict[str, Any], /) -> None:
# Brutally ingest the record
for key in record.keys():
setattr(self.fields, key, record[key])
# Insert into the database
# If we do not commit immediately, we defer the execute() call
def insert(
self,
/,
*,
commit=True,
no_defer=False,
override_query: str | None = None
2025-01-28 23:07:12 +01:00
) -> None:
if override_query:
query = override_query
else:
query = self.insert_query
2025-01-17 11:03:00 +01:00
database = BrickSQL()
2025-01-28 23:07:12 +01:00
database.execute(
query,
2025-01-17 11:03:00 +01:00
parameters=self.sql_parameters(),
defer=not commit and not no_defer,
2025-01-17 11:03:00 +01:00
)
if commit:
database.commit()
# Shorthand to field items
def items(self, /) -> ItemsView[str, Any]:
return self.fields.__dict__.items()
# Get from the database using the query
def select(
self,
/,
*,
override_query: str | None = None,
**context: Any
) -> bool:
2025-01-17 11:03:00 +01:00
if override_query:
query = override_query
else:
query = self.select_query
record = BrickSQL().fetchone(
2025-01-17 11:03:00 +01:00
query,
parameters=self.sql_parameters(),
**context
2025-01-17 11:03:00 +01:00
)
# Ingest the record
if record is not None:
self.ingest(record)
return True
else:
return False
2025-01-17 11:03:00 +01:00
# Generic SQL parameters from fields
def sql_parameters(self, /) -> dict[str, Any]:
parameters: dict[str, Any] = {}
for name, value in self.items():
parameters[name] = value
return parameters