Fixed smart-lists so number are treated like int and not string
This commit is contained in:
184
app/db.py
184
app/db.py
@@ -4,7 +4,7 @@ from __future__ import annotations
|
||||
import re
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Tuple
|
||||
from typing import Any, Dict, List, Tuple, Optional
|
||||
|
||||
DB_PATH = Path("/data/library.db")
|
||||
|
||||
@@ -311,7 +311,8 @@ def search_count(conn: sqlite3.Connection, q: str) -> int:
|
||||
|
||||
# ----------------------------- Smart Lists ------------------------------------
|
||||
|
||||
FIELD_MAP = {
|
||||
# Map external field names to DB columns
|
||||
FIELD_MAP: Dict[str, str] = {
|
||||
"title": "m.title",
|
||||
"series": "m.series",
|
||||
"number": "m.number",
|
||||
@@ -331,85 +332,107 @@ FIELD_MAP = {
|
||||
"name": "i.name",
|
||||
}
|
||||
|
||||
# Treat these fields as numeric (cast when comparing/sorting)
|
||||
NUMERIC_FIELDS = {"number", "volume", "year", "month", "day"}
|
||||
|
||||
def _field_sql(field: str) -> str:
|
||||
f = (field or "").lower()
|
||||
col = FIELD_MAP.get(f)
|
||||
if not col:
|
||||
if f in ("title","series","number","volume","year","month","day","writer","publisher",
|
||||
"summary","genre","tags","characters","teams","locations"):
|
||||
col = f"m.{f}"
|
||||
else:
|
||||
col = "i.name"
|
||||
def _like_escape(s: str) -> str:
|
||||
# Escape %, _ and backslash for LIKE
|
||||
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
|
||||
|
||||
def _sql_expr_for_field(field: str) -> str:
|
||||
"""
|
||||
Returns a SQL expression referencing the correct column, with casting for numeric fields.
|
||||
"""
|
||||
col = FIELD_MAP.get(field, f"m.{field}")
|
||||
if field in NUMERIC_FIELDS:
|
||||
# CAST(NULLIF(col,'') AS INTEGER) safely yields NULL for '', non-numeric
|
||||
return f"CAST(NULLIF({col},'') AS INTEGER)"
|
||||
return col
|
||||
|
||||
def _numeric_cast(col: str, field: str) -> str:
|
||||
return f"CAST(COALESCE(NULLIF({col},''),'0') AS INTEGER)" if field in NUMERIC_FIELDS else col
|
||||
def build_smartlist_where(spec_or_groups: Any) -> Tuple[str, List[Any]]:
|
||||
"""
|
||||
Accepts either a full spec {"groups":[...], "join":"AND|OR"} or just a groups array.
|
||||
Returns (where_sql, params) with parameterized values.
|
||||
"""
|
||||
if isinstance(spec_or_groups, dict):
|
||||
groups = spec_or_groups.get("groups") or []
|
||||
across = (spec_or_groups.get("join") or "AND").upper()
|
||||
else:
|
||||
groups = spec_or_groups or []
|
||||
across = "AND"
|
||||
|
||||
def _rule_to_sql(rule: Dict[str, Any]) -> Tuple[str, List[Any]]:
|
||||
field = (rule.get("field") or "").lower()
|
||||
op = (rule.get("op") or "contains").lower()
|
||||
val = rule.get("value")
|
||||
negate = bool(rule.get("not", False))
|
||||
if across not in ("AND", "OR"):
|
||||
across = "AND"
|
||||
|
||||
col = _field_sql(field)
|
||||
col_cmp = _numeric_cast(col, field)
|
||||
|
||||
sql = ""
|
||||
where_parts: List[str] = []
|
||||
params: List[Any] = []
|
||||
|
||||
if op == "exists":
|
||||
sql = f"({col} IS NOT NULL AND {col}!='')"
|
||||
elif op == "missing":
|
||||
sql = f"({col} IS NULL OR {col}='')"
|
||||
elif op == "equals":
|
||||
sql = f"{col} = ?"
|
||||
params = [val]
|
||||
elif op == "contains":
|
||||
sql = f"{col} LIKE ?"
|
||||
params = [f"%{val}%"]
|
||||
elif op == "startswith":
|
||||
sql = f"{col} LIKE ?"
|
||||
params = [f"{val}%"]
|
||||
elif op == "endswith":
|
||||
sql = f"{col} LIKE ?"
|
||||
params = [f"%{val}"]
|
||||
elif op in ("gt","gte","lt","lte"):
|
||||
op_map = {"gt":">","gte":">=","lt":"<","lte":"<="}
|
||||
sql = f"{col_cmp} {op_map[op]} ?"
|
||||
params = [val]
|
||||
else:
|
||||
sql = f"{col} LIKE ?"
|
||||
params = [f"%{val}%"]
|
||||
|
||||
if negate:
|
||||
sql = f"NOT ({sql})"
|
||||
|
||||
return sql, params
|
||||
|
||||
def _groups_to_where(groups: List[Dict[str, Any]]) -> Tuple[str, List[Any]]:
|
||||
if not groups:
|
||||
return "1=1", []
|
||||
|
||||
clauses: List[str] = []
|
||||
params: List[Any] = []
|
||||
|
||||
for g in groups:
|
||||
rs = g.get("rules") or []
|
||||
r_sqls: List[str] = []
|
||||
r_params: List[Any] = []
|
||||
for r in rs:
|
||||
s, p = _rule_to_sql(r)
|
||||
r_sqls.append(s)
|
||||
r_params.extend(p)
|
||||
if r_sqls:
|
||||
clauses.append("(" + " AND ".join(r_sqls) + ")")
|
||||
params.extend(r_params)
|
||||
rules = g.get("rules") or []
|
||||
rule_sqls: List[str] = []
|
||||
|
||||
if not clauses:
|
||||
for r in rules:
|
||||
field = (r.get("field") or "").strip()
|
||||
op = (r.get("op") or "").strip().lower()
|
||||
value = r.get("value")
|
||||
is_not = bool(r.get("not"))
|
||||
|
||||
if not field or op == "":
|
||||
continue
|
||||
|
||||
expr = _sql_expr_for_field(field)
|
||||
|
||||
# Normalize numeric values if needed
|
||||
if field in NUMERIC_FIELDS:
|
||||
try:
|
||||
if isinstance(value, str):
|
||||
value = value.strip()
|
||||
value = int(value)
|
||||
except Exception:
|
||||
# Make an impossible predicate so this rule never matches
|
||||
rule_sqls.append("1=0")
|
||||
continue
|
||||
|
||||
# Operator handling
|
||||
if op in ("=", "eq", "equals"):
|
||||
sql = f"{expr} = ?"; params.append(value)
|
||||
elif op in ("!=", "ne", "notequals"):
|
||||
sql = f"{expr} <> ?"; params.append(value)
|
||||
elif op in (">=", "gte"):
|
||||
sql = f"{expr} >= ?"; params.append(value)
|
||||
elif op in ("<=", "lte"):
|
||||
sql = f"{expr} <= ?"; params.append(value)
|
||||
elif op in (">", "gt"):
|
||||
sql = f"{expr} > ?"; params.append(value)
|
||||
elif op in ("<", "lt"):
|
||||
sql = f"{expr} < ?"; params.append(value)
|
||||
elif op in ("contains", "~"):
|
||||
sql = f"{expr} LIKE ? ESCAPE '\\' COLLATE NOCASE"
|
||||
params.append(f"%{_like_escape(str(value))}%")
|
||||
elif op in ("startswith", "prefix"):
|
||||
sql = f"{expr} LIKE ? ESCAPE '\\' COLLATE NOCASE"
|
||||
params.append(f"{_like_escape(str(value))}%")
|
||||
elif op in ("endswith", "suffix"):
|
||||
sql = f"{expr} LIKE ? ESCAPE '\\' COLLATE NOCASE"
|
||||
params.append(f"%{_like_escape(str(value))}")
|
||||
else:
|
||||
# Unknown op -> skip rule
|
||||
continue
|
||||
|
||||
if is_not:
|
||||
sql = f"NOT ({sql})"
|
||||
|
||||
rule_sqls.append(sql)
|
||||
|
||||
# Default: AND within a group
|
||||
if rule_sqls:
|
||||
where_parts.append("(" + " AND ".join(rule_sqls) + ")")
|
||||
|
||||
if not where_parts:
|
||||
return "1=1", []
|
||||
return "(" + " OR ".join(clauses) + ")", params
|
||||
|
||||
joiner = f" {across} "
|
||||
return joiner.join(where_parts), params
|
||||
|
||||
def _order_by_for_sort(sort: str) -> str:
|
||||
s = (sort or "").lower()
|
||||
@@ -438,7 +461,8 @@ def _order_by_for_sort(sort: str) -> str:
|
||||
|
||||
def smartlist_query(conn: sqlite3.Connection, groups: List[Dict[str, Any]], sort: str,
|
||||
limit: int, offset: int, distinct_by_series: bool):
|
||||
where, params = _groups_to_where(groups)
|
||||
# Build WHERE from groups (OR supply a full spec if you prefer)
|
||||
where, params = build_smartlist_where(groups)
|
||||
order_clause = _order_by_for_sort(sort)
|
||||
|
||||
if not distinct_by_series:
|
||||
@@ -452,8 +476,7 @@ def smartlist_query(conn: sqlite3.Connection, groups: List[Dict[str, Any]], sort
|
||||
"""
|
||||
return conn.execute(sql, (*params, limit, offset)).fetchall()
|
||||
|
||||
# Portable DISTINCT-by-series using a correlated NOT EXISTS:
|
||||
# pick the "newest" per series by (year desc, number desc, mtime desc)
|
||||
# DISTINCT by series (pick "newest" per series)
|
||||
sql = f"""
|
||||
SELECT i.*, m.*
|
||||
FROM items i
|
||||
@@ -488,7 +511,7 @@ def smartlist_query(conn: sqlite3.Connection, groups: List[Dict[str, Any]], sort
|
||||
return conn.execute(sql, (*params, limit, offset)).fetchall()
|
||||
|
||||
def smartlist_count(conn: sqlite3.Connection, groups: List[Dict[str, Any]]) -> int:
|
||||
where, params = _groups_to_where(groups)
|
||||
where, params = build_smartlist_where(groups)
|
||||
row = conn.execute(f"""
|
||||
SELECT COUNT(*)
|
||||
FROM items i
|
||||
@@ -537,10 +560,10 @@ def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
|
||||
LIMIT 20
|
||||
""")
|
||||
]
|
||||
out["top_publishers"] = top_pubs # existing name
|
||||
out["publishers_breakdown"] = top_pubs # alias for dashboards that expect this
|
||||
out["top_publishers"] = top_pubs
|
||||
out["publishers_breakdown"] = top_pubs # alias for dashboards
|
||||
|
||||
# Publication timeline by year (ascending) — used by line chart
|
||||
# Publication timeline by year (ascending)
|
||||
timeline = [
|
||||
{"year": int(row[0]), "count": row[1]}
|
||||
for row in conn.execute("""
|
||||
@@ -554,10 +577,10 @@ def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
|
||||
""")
|
||||
if row[0] is not None
|
||||
]
|
||||
out["timeline_by_year"] = timeline # new
|
||||
out["publication_timeline"] = timeline # alias (some dashboards used this name)
|
||||
out["timeline_by_year"] = timeline
|
||||
out["publication_timeline"] = timeline # alias
|
||||
|
||||
# Top writers (split on commas, normalized) — used by horizontal bar chart
|
||||
# Top writers (split on commas, normalized)
|
||||
rows = conn.execute("""
|
||||
SELECT m.writer
|
||||
FROM items i
|
||||
@@ -580,4 +603,3 @@ def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
|
||||
out["top_writers"] = top_writers
|
||||
|
||||
return out
|
||||
|
||||
|
||||
Reference in New Issue
Block a user