diff --git a/app/db.py b/app/db.py
index 2425a7a..39ef0d2 100644
--- a/app/db.py
+++ b/app/db.py
@@ -8,35 +8,38 @@ from typing import Any, Dict, List, Tuple, Optional
DB_PATH = Path("/data/library.db")
-# Feature flag: set after schema init
HAS_FTS5: bool = False
def has_fts5() -> bool:
- """Return True if the DB initialized an FTS5 virtual table."""
return HAS_FTS5
-# ----------------------------- Connection & Schema -----------------------------
-
def connect() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
- # Pragmas for speed & concurrency (tweak as needed)
try: conn.execute("PRAGMA journal_mode=WAL;")
except Exception: pass
try: conn.execute("PRAGMA synchronous=NORMAL;")
except Exception: pass
try: conn.execute("PRAGMA temp_store=MEMORY;")
except Exception: pass
- # ~200MB page cache (negative means KiB)
try: conn.execute("PRAGMA cache_size=-200000;")
except Exception: pass
_ensure_schema(conn)
return conn
+def _column_exists(conn: sqlite3.Connection, table: str, column: str) -> bool:
+ row = conn.execute(f"PRAGMA table_info({table})").fetchall()
+ return any(r[1].lower() == column.lower() for r in row)
+
+def _add_column(conn: sqlite3.Connection, table: str, column: str, decl: str) -> None:
+ try:
+ conn.execute(f"ALTER TABLE {table} ADD COLUMN {column} {decl}")
+ except sqlite3.OperationalError:
+ pass
+
def _ensure_schema(conn: sqlite3.Connection) -> None:
global HAS_FTS5
- # Core tables
conn.execute("""
CREATE TABLE IF NOT EXISTS items (
rel TEXT PRIMARY KEY,
@@ -66,12 +69,14 @@ def _ensure_schema(conn: sqlite3.Connection) -> None:
characters TEXT,
teams TEXT,
locations TEXT,
- comicvineissue TEXT,
- format TEXT
+ comicvineissue TEXT
)
""")
- # Helpful indexes
+ # migration: ensure 'format' column exists
+ if not _column_exists(conn, "meta", "format"):
+ _add_column(conn, "meta", "format", "TEXT")
+
conn.execute("CREATE INDEX IF NOT EXISTS idx_items_parent ON items(parent)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_items_name ON items(name)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_items_isdir ON items(is_dir)")
@@ -80,9 +85,8 @@ def _ensure_schema(conn: sqlite3.Connection) -> None:
conn.execute("CREATE INDEX IF NOT EXISTS idx_meta_year ON meta(year)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_meta_writer ON meta(writer)")
conn.execute("CREATE INDEX IF NOT EXISTS idx_meta_publisher ON meta(publisher)")
- conn.execute("CREATE INDEX IF NOT EXISTS idx_meta_format ON meta(format)")
+ conn.execute("CREATE INDEX IF NOT EXISTS idx_meta_format ON meta(format)")
- # Try FTS5 — if it fails, we fall back to LIKE search
try:
conn.execute("""
CREATE VIRTUAL TABLE IF NOT EXISTS fts
@@ -98,7 +102,6 @@ def _ensure_schema(conn: sqlite3.Connection) -> None:
# ----------------------------- Scan lifecycle ---------------------------------
def begin_scan(conn: sqlite3.Connection) -> None:
- """Called once at the beginning of a full reindex."""
conn.execute("DELETE FROM items")
conn.execute("DELETE FROM meta")
if HAS_FTS5:
@@ -136,23 +139,25 @@ def upsert_file(conn: sqlite3.Connection, rel: str, name: str, size: int, mtime:
)
def upsert_meta(conn: sqlite3.Connection, rel: str, meta: Dict[str, Any]) -> None:
- fields = (
+ cols = [
"title","series","number","volume","year","month","day",
"writer","publisher","summary","genre","tags","characters",
- "teams","locations","comicvineissue","format"
- )
- vals = [meta.get(k) for k in fields]
+ "teams","locations","comicvineissue"
+ ]
+ if _column_exists(conn, "meta", "format"):
+ cols.append("format")
+
+ vals = [meta.get(k) for k in cols]
exists = conn.execute("SELECT 1 FROM meta WHERE rel=?", (rel,)).fetchone() is not None
if exists:
- sets = ",".join([f"{k}=?" for k in fields])
+ sets = ",".join([f"{k}=?" for k in cols])
conn.execute(f"UPDATE meta SET {sets} WHERE rel=?", (*vals, rel))
else:
- cols = ",".join(fields)
- qms = ",".join(["?"] * len(fields))
- conn.execute(f"INSERT INTO meta(rel,{cols}) VALUES (?,{qms})", (rel, *vals))
+ col_csv = ",".join(cols)
+ qms = ",".join(["?"] * len(cols))
+ conn.execute(f"INSERT INTO meta(rel,{col_csv}) VALUES (?,{qms})", (rel, *vals))
- # Refresh FTS row for this file (only if supported & it's a file)
if HAS_FTS5:
it = conn.execute("SELECT name, is_dir FROM items WHERE rel=?", (rel,)).fetchone()
if not it or int(it["is_dir"]) != 0:
@@ -178,7 +183,8 @@ def upsert_meta(conn: sqlite3.Connection, rel: str, meta: Dict[str, Any]) -> Non
add(meta.get("year"))
add(meta.get("number"))
add(meta.get("volume"))
- add(meta.get("format"))
+ if "format" in meta:
+ add(meta.get("format"))
conn.execute("DELETE FROM fts WHERE rel=?", (rel,))
if parts:
@@ -245,7 +251,6 @@ def search_q(conn: sqlite3.Connection, q: str, limit: int, offset: int):
where.append("i.rel IN (SELECT rel FROM fts WHERE fts MATCH ?)")
params.append(match)
elif words:
- # Fallback LIKEs on selected columns
for w in words:
where.append("""
(
@@ -314,7 +319,6 @@ def search_count(conn: sqlite3.Connection, q: str) -> int:
# ----------------------------- Smart Lists ------------------------------------
-# Map external field names to DB columns
FIELD_MAP: Dict[str, str] = {
"title": "m.title",
"series": "m.series",
@@ -336,37 +340,30 @@ FIELD_MAP: Dict[str, str] = {
"format": "m.format",
}
-# Treat these fields as numeric (cast when comparing/sorting)
NUMERIC_FIELDS = {"number", "volume", "year", "month", "day"}
def _like_escape(s: str) -> str:
- # Escape %, _ and backslash for LIKE
return s.replace("\\", "\\\\").replace("%", "\\%").replace("_", "\\_")
def _sql_expr_for_field(field: str) -> str:
- """
- Returns a SQL expression referencing the correct column, with casting for numeric fields.
- """
col = FIELD_MAP.get(field, f"m.{field}")
if field in NUMERIC_FIELDS:
- # CAST(NULLIF(col,'') AS INTEGER) safely yields NULL for '', non-numeric
return f"CAST(NULLIF({col},'') AS INTEGER)"
return col
def build_smartlist_where(spec_or_groups: Any) -> Tuple[str, List[Any]]:
"""
- Accepts either a full spec {"groups":[...], "join":"AND|OR"} or just a groups array.
- Returns (where_sql, params) with parameterized values.
+ Groups are OR'd by default. Rules inside a group are AND'd.
"""
if isinstance(spec_or_groups, dict):
groups = spec_or_groups.get("groups") or []
- across = (spec_or_groups.get("join") or "AND").upper()
+ across = (spec_or_groups.get("join") or "OR").upper() # <<< default OR
else:
groups = spec_or_groups or []
- across = "AND"
+ across = "OR" # <<< default OR
if across not in ("AND", "OR"):
- across = "AND"
+ across = "OR"
where_parts: List[str] = []
params: List[Any] = []
@@ -386,18 +383,15 @@ def build_smartlist_where(spec_or_groups: Any) -> Tuple[str, List[Any]]:
expr = _sql_expr_for_field(field)
- # Normalize numeric values if needed
if field in NUMERIC_FIELDS:
try:
if isinstance(value, str):
value = value.strip()
value = int(value)
except Exception:
- # Make an impossible predicate so this rule never matches
rule_sqls.append("1=0")
continue
- # Operator handling
if op in ("=", "eq", "equals"):
sql = f"{expr} = ?"; params.append(value)
elif op in ("!=", "ne", "notequals"):
@@ -420,7 +414,6 @@ def build_smartlist_where(spec_or_groups: Any) -> Tuple[str, List[Any]]:
sql = f"{expr} LIKE ? ESCAPE '\\' COLLATE NOCASE"
params.append(f"%{_like_escape(str(value))}")
else:
- # Unknown op -> skip rule
continue
if is_not:
@@ -428,7 +421,6 @@ def build_smartlist_where(spec_or_groups: Any) -> Tuple[str, List[Any]]:
rule_sqls.append(sql)
- # Default: AND within a group
if rule_sqls:
where_parts.append("(" + " AND ".join(rule_sqls) + ")")
@@ -438,6 +430,50 @@ def build_smartlist_where(spec_or_groups: Any) -> Tuple[str, List[Any]]:
joiner = f" {across} "
return joiner.join(where_parts), params
+# ---- FTS prefilter for smartlists (matches per-group, then ORs groups) ----
+
+_TEXT_FIELDS_FOR_FTS = {
+ "title","series","publisher","writer","summary","genre",
+ "tags","characters","teams","locations","name","filename","format"
+}
+
+def _fts_group_expr_from_rules(rules: List[Dict[str, Any]]) -> Optional[str]:
+ """
+ Build an FTS 'group' expression like: "batman* AND 2016*"
+ Only from rules that are: field in text set, op in ('contains','~'), not negated, and string values.
+ If the group has zero qualifying rules, return None (we'll skip FTS prefilter to avoid over-restricting).
+ """
+ tokens: List[str] = []
+ for r in (rules or []):
+ field = (r.get("field") or "").lower()
+ op = (r.get("op") or "").lower()
+ val = r.get("value")
+ if field in _TEXT_FIELDS_FOR_FTS and op in ("contains","~") and isinstance(val, str) and not r.get("not"):
+ tokens.extend(re.findall(r"[0-9A-Za-z]{2,}", val))
+ if not tokens:
+ return None
+ return " AND ".join(f"{t}*" for t in tokens)
+
+def _build_fts_prefilter(groups: List[Dict[str, Any]]) -> Tuple[str, List[Any]]:
+ """
+ Returns (fts_sql_fragment, params). If any group cannot be expressed in FTS, returns ("", []) to skip prefilter.
+ Otherwise returns:
+ AND i.rel IN (SELECT rel FROM fts WHERE fts MATCH ?)
+ with a parameter like: "(g1expr) OR (g2expr) OR ..."
+ """
+ if not HAS_FTS5:
+ return "", []
+ exprs: List[str] = []
+ for g in (groups or []):
+ expr = _fts_group_expr_from_rules(g.get("rules") or [])
+ if expr is None:
+ # at least one group has no 'contains' terms -> skip FTS to avoid excluding valid rows
+ return "", []
+ exprs.append(f"({expr})")
+ if not exprs:
+ return "", []
+ return " AND i.rel IN (SELECT rel FROM fts WHERE fts MATCH ?)", [" OR ".join(exprs)]
+
def _order_by_for_sort(sort: str) -> str:
s = (sort or "").lower()
if s == "issued_asc":
@@ -469,26 +505,6 @@ def _order_by_for_sort(sort: str) -> str:
return "COALESCE(m.series, i.name) ASC, " \
"CAST(COALESCE(NULLIF(m.number,''),'0') AS INTEGER) ASC, i.name ASC"
-# ---- FTS prefilter for smartlists (speeds up 'contains' text rules) ----
-
-_TEXT_FIELDS_FOR_FTS = {
- "title","series","publisher","writer","summary","genre",
- "tags","characters","teams","locations","name","filename",
- "format"
-}
-
-def _extract_fts_terms_from_groups(groups: List[Dict[str, Any]]) -> List[str]:
- terms: List[str] = []
- for g in (groups or []):
- for r in (g.get("rules") or []):
- field = (r.get("field") or "").lower()
- op = (r.get("op") or "").lower()
- val = r.get("value")
- if field in _TEXT_FIELDS_FOR_FTS and op in ("contains","~") and isinstance(val, str) and not r.get("not"):
- tokens = re.findall(r"[0-9A-Za-z]{2,}", val)
- terms.extend(t + "*" for t in tokens)
- return terms
-
# ---- Smartlist runners --------------------------------------------------------
def smartlist_query(
@@ -497,27 +513,13 @@ def smartlist_query(
sort: str,
limit: int,
offset: int,
- distinct_by_series: bool
+ distinct_by_series: Any
):
- """
- Backward-compatible API (used by existing routes).
- - Adds FTS prefilter when possible.
- - If distinct_by_series is 'latest' or 'oldest' (string), uses that mode.
- If True, defaults to 'latest'.
- """
where, params = build_smartlist_where(groups)
order_clause = _order_by_for_sort(sort)
- # Optional FTS prefilter
- fts_sql = ""
- fts_params: List[Any] = []
- if HAS_FTS5:
- tokens = _extract_fts_terms_from_groups(groups)
- if tokens:
- fts_sql = " AND i.rel IN (SELECT rel FROM fts WHERE fts MATCH ?)"
- fts_params = [" AND ".join(tokens)]
+ fts_sql, fts_params = _build_fts_prefilter(groups)
- # Distinct mode handling
mode = "latest"
if isinstance(distinct_by_series, str) and distinct_by_series in ("latest", "oldest"):
use_distinct = True
@@ -536,7 +538,6 @@ def smartlist_query(
"""
return conn.execute(sql, (*params, *fts_params, limit, offset)).fetchall()
- # DISTINCT by (series, volume), with latest/oldest mode
cmp_year = "CAST(COALESCE(NULLIF(m2.year,''),'0') AS INTEGER) {op} CAST(COALESCE(NULLIF(m.year,''),'0') AS INTEGER)"
cmp_number = "CAST(COALESCE(NULLIF(m2.number,''),'0') AS INTEGER) {op} CAST(COALESCE(NULLIF(m.number,''),'0') AS INTEGER)"
cmp_mtime = "i2.mtime {op} i.mtime"
@@ -578,15 +579,7 @@ def smartlist_query(
def smartlist_count(conn: sqlite3.Connection, groups: List[Dict[str, Any]]) -> int:
where, params = build_smartlist_where(groups)
-
- fts_sql = ""
- fts_params: List[Any] = []
- if HAS_FTS5:
- tokens = _extract_fts_terms_from_groups(groups)
- if tokens:
- fts_sql = " AND i.rel IN (SELECT rel FROM fts WHERE fts MATCH ?)"
- fts_params = [" AND ".join(tokens)]
-
+ fts_sql, fts_params = _build_fts_prefilter(groups)
row = conn.execute(f"""
SELECT COUNT(*)
FROM items i
@@ -600,7 +593,6 @@ def smartlist_count(conn: sqlite3.Connection, groups: List[Dict[str, Any]]) -> i
def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
out: Dict[str, Any] = {}
- # Core counts
out["total_comics"] = conn.execute(
"SELECT COUNT(*) FROM items WHERE is_dir=0"
).fetchone()[0]
@@ -621,48 +613,6 @@ def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
"SELECT MAX(mtime) FROM items"
).fetchone()[0]
- # Formats breakdown (top 12 + "Other")
- rows = conn.execute("""
- SELECT LOWER(TRIM(IFNULL(m.format,''))) AS fmt, COUNT(*) AS c
- FROM items i
- LEFT JOIN meta m ON m.rel=i.rel
- WHERE i.is_dir=0
- GROUP BY fmt
- """).fetchall()
-
- # normalize common aliases
- alias = {
- "trade paperback": "tpb",
- "tpb": "tpb",
- "hardcover": "hc",
- "hc": "hc",
- "one-shot": "one-shot",
- "oneshot": "one-shot",
- "limited series": "limited series",
- "ongoing series": "ongoing series",
- "graphic novel": "graphic novel",
- "web": "web",
- "digital": "digital",
- }
- counts = {}
- for r in rows:
- key = (r["fmt"] or "").strip()
- if not key:
- key = "(unknown)"
- key = alias.get(key, key)
- counts[key] = counts.get(key, 0) + int(r["c"])
-
- # top 12 + other
- sorted_items = sorted(counts.items(), key=lambda kv: kv[1], reverse=True)
- top = sorted_items[:12]
- other_count = sum(v for _, v in sorted_items[12:])
- formats = [{"format": k, "count": v} for k, v in top]
- if other_count:
- formats.append({"format": "other", "count": other_count})
-
- out["formats_breakdown"] = formats
-
- # Publishers breakdown (top N)
top_pubs = [
{"publisher": row[0], "count": row[1]}
for row in conn.execute("""
@@ -677,9 +627,8 @@ def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
""")
]
out["top_publishers"] = top_pubs
- out["publishers_breakdown"] = top_pubs # alias for dashboards
+ out["publishers_breakdown"] = top_pubs
- # Publication timeline by year (ascending)
timeline = [
{"year": int(row[0]), "count": row[1]}
for row in conn.execute("""
@@ -694,9 +643,38 @@ def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
if row[0] is not None
]
out["timeline_by_year"] = timeline
- out["publication_timeline"] = timeline # alias
+ out["publication_timeline"] = timeline
+
+ # formats breakdown (expects column present; unknowns grouped)
+ rows = conn.execute("""
+ SELECT LOWER(TRIM(IFNULL(m.format,''))) AS fmt, COUNT(*) AS c
+ FROM items i
+ LEFT JOIN meta m ON m.rel=i.rel
+ WHERE i.is_dir=0
+ GROUP BY fmt
+ """).fetchall()
+ alias = {
+ "trade paperback": "tpb", "tpb":"tpb",
+ "hardcover":"hc", "hc":"hc",
+ "one-shot":"one-shot","oneshot":"one-shot",
+ "limited series":"limited series",
+ "ongoing series":"ongoing series",
+ "graphic novel":"graphic novel",
+ "web":"web","digital":"digital"
+ }
+ counts: Dict[str,int] = {}
+ for r in rows:
+ key = (r["fmt"] or "").strip() or "(unknown)"
+ key = alias.get(key, key)
+ counts[key] = counts.get(key, 0) + int(r["c"])
+ sorted_items = sorted(counts.items(), key=lambda kv: kv[1], reverse=True)
+ top = sorted_items[:12]
+ other = sum(v for _, v in sorted_items[12:])
+ formats = [{"format": k, "count": v} for k, v in top]
+ if other:
+ formats.append({"format":"other","count":other})
+ out["formats_breakdown"] = formats
- # Top writers (split on commas, normalized)
rows = conn.execute("""
SELECT m.writer
FROM items i
@@ -704,18 +682,17 @@ def stats(conn: sqlite3.Connection) -> Dict[str, Any]:
WHERE i.is_dir=0 AND m.writer IS NOT NULL AND TRIM(m.writer)!=''
""").fetchall()
- counts: Dict[str, int] = {}
+ counts_w: Dict[str, int] = {}
for (w,) in rows:
for name in (x.strip() for x in w.split(",") if x.strip()):
key = name.lower()
- counts[key] = counts.get(key, 0) + 1
+ counts_w[key] = counts_w.get(key, 0) + 1
top_writers = sorted(
- ({"writer": name.title(), "count": c} for name, c in counts.items()),
+ ({"writer": name.title(), "count": c} for name, c in counts_w.items()),
key=lambda d: d["count"],
reverse=True,
- )[:10]
-
+ )[:20]
out["top_writers"] = top_writers
return out
diff --git a/app/templates/entry.xml.j2 b/app/templates/entry.xml.j2
index 9c31c8c..db4fc43 100644
--- a/app/templates/entry.xml.j2
+++ b/app/templates/entry.xml.j2
@@ -15,7 +15,7 @@
-
+