#!/usr/bin/env python3 """ ComicVine SQLite Database Updater A stand-alone script to keep a localcv.db SQLite database updated with new ComicVine data. Fetches publishers, volumes, and issues that have been added or modified since the last sync. This version creates dated copies: localcv-YYYY-MM-DD.db Usage: # Interactive mode python sqlite_cv_updater.py [database_path] # Non-interactive mode (for cron) - creates today's database from yesterday's python sqlite_cv_updater.py --non-interactive --db-dir /path/to/db/directory # Using environment variable export COMICVINE_API_KEY=your_key_here python sqlite_cv_updater.py --non-interactive --db-dir /path/to/db/directory Example crontab entry (runs daily at 3 AM): 0 3 * * * /usr/bin/python3 /path/to/sqlite_cv_updater.py --non-interactive --db-dir /path/to/db >> /var/log/comicvine_sync.log 2>&1 """ import argparse import json import os import shutil import smtplib import sqlite3 import sys import time from datetime import datetime, timedelta from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from io import StringIO from pathlib import Path from typing import Any try: import requests except ImportError: print("ERROR: requests library is required.") print("Install with: pip install requests") sys.exit(1) try: import tkinter as tk from tkinter import filedialog HAS_GUI = True except ImportError: HAS_GUI = False # API Configuration API_BASE_URL = "https://comicvine.gamespot.com/api/" HEADERS = {"User-Agent": "LocalCVUpdater/1.0", "From": "comicvine-user@example.com"} # Rate limiting API_DELAY = 1.5 # Seconds between API calls (ComicVine limit is ~200/hour) MAX_RETRIES = 3 class LogCapture: """Captures log output for email reporting.""" def __init__(self): self.buffer = StringIO() self.start_time = datetime.now() def write(self, message: str) -> None: """Write a message to the buffer.""" self.buffer.write(message) if not message.endswith('\n'): self.buffer.write('\n') def get_contents(self) -> str: """Get all captured log contents.""" return self.buffer.getvalue() def get_duration(self) -> str: """Get the duration since start.""" duration = datetime.now() - self.start_time hours, remainder = divmod(int(duration.total_seconds()), 3600) minutes, seconds = divmod(remainder, 60) if hours > 0: return f"{hours}h {minutes}m {seconds}s" elif minutes > 0: return f"{minutes}m {seconds}s" else: return f"{seconds}s" class ComicVineUpdater: """Updates a SQLite database with ComicVine data.""" def __init__(self, db_path: str, api_key: str, verbose: bool = True, log_capture: LogCapture | None = None): self.db_path = Path(db_path) self.api_key = api_key self.verbose = verbose self.log_capture = log_capture self.conn: sqlite3.Connection | None = None self.stats = { "publishers_added": 0, "publishers_updated": 0, "persons_added": 0, "persons_updated": 0, "volumes_added": 0, "volumes_updated": 0, "issues_added": 0, "issues_updated": 0, "api_calls": 0, } def log(self, message: str, force: bool = False) -> None: """Print message if verbose mode is enabled.""" if self.verbose or force: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") formatted = f"[{timestamp}] {message}" print(formatted) # Always capture to log buffer if available (for email) if self.log_capture: timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") self.log_capture.write(f"[{timestamp}] {message}") def connect(self) -> bool: """Connect to the SQLite database.""" if not self.db_path.exists(): self.log(f"ERROR: Database not found: {self.db_path}", force=True) return False try: self.conn = sqlite3.connect(self.db_path) self.conn.row_factory = sqlite3.Row # Enable WAL mode for better concurrent access self.conn.execute("PRAGMA journal_mode=WAL") self.conn.execute("PRAGMA synchronous=NORMAL") self.log(f"Connected to database: {self.db_path}") return True except sqlite3.Error as e: self.log(f"ERROR: Could not connect to database: {e}", force=True) return False def close(self) -> None: """Close the database connection.""" if self.conn: self.conn.close() self.conn = None def ensure_metadata_table(self) -> None: """Create metadata table if it doesn't exist.""" if not self.conn: return # Create table if not exists with new schema self.conn.execute(""" CREATE TABLE IF NOT EXISTS cv_sync_metadata ( endpoint TEXT PRIMARY KEY, last_sync_date TEXT NOT NULL, last_sync_timestamp TEXT NOT NULL, resume_state TEXT ) """) # Check if resume_state column exists (for migration of existing DBs) cur = self.conn.execute("PRAGMA table_info(cv_sync_metadata)") columns = [info[1] for info in cur.fetchall()] if "resume_state" not in columns: self.log(" Adding resume_state column to cv_sync_metadata...") try: self.conn.execute("ALTER TABLE cv_sync_metadata ADD COLUMN resume_state TEXT") except sqlite3.OperationalError as e: self.log(f" Warning: Could not add resume_state column: {e}") self.conn.commit() def get_endpoint_last_sync(self, endpoint: str) -> str | None: """Get the last sync date for a specific endpoint.""" if not self.conn: return None try: cur = self.conn.execute( "SELECT last_sync_date FROM cv_sync_metadata WHERE endpoint = ?", (endpoint,) ) row = cur.fetchone() return row[0] if row else None except sqlite3.OperationalError: return None def set_endpoint_last_sync(self, endpoint: str, date_str: str) -> None: """Set the last sync date for a specific endpoint.""" if not self.conn: return timestamp = datetime.now().isoformat() # Use upsert to preserve resume_state if it exists self.conn.execute( """INSERT INTO cv_sync_metadata (endpoint, last_sync_date, last_sync_timestamp) VALUES (?, ?, ?) ON CONFLICT(endpoint) DO UPDATE SET last_sync_date=excluded.last_sync_date, last_sync_timestamp=excluded.last_sync_timestamp""", (endpoint, date_str, timestamp), ) self.conn.commit() def get_resume_state(self, endpoint: str) -> dict[str, Any] | None: """Get the resumption state for an endpoint.""" if not self.conn: return None try: cur = self.conn.execute( "SELECT resume_state FROM cv_sync_metadata WHERE endpoint = ?", (endpoint,) ) row = cur.fetchone() if row and row[0]: return json.loads(row[0]) return None except (sqlite3.OperationalError, json.JSONDecodeError): return None def save_resume_state(self, endpoint: str, state: dict[str, Any]) -> None: """Save the resumption state for an endpoint.""" if not self.conn: return try: state_json = json.dumps(state) # update existing row, don't insert new one if not exists (metadata should exist if we are syncing) self.conn.execute( "UPDATE cv_sync_metadata SET resume_state = ? WHERE endpoint = ?", (state_json, endpoint) ) # If update affected 0 rows, we might need to insert (though unlikely if sync started) if self.conn.total_changes == 0: # fallback insert with dummy date if needed, relying on ensure_metadata_table logic usually pass self.conn.commit() except sqlite3.Error as e: self.log(f" Warning: Could not save resume state: {e}") def clear_resume_state(self, endpoint: str) -> None: """Clear the resumption state for an endpoint.""" if not self.conn: return try: self.conn.execute( "UPDATE cv_sync_metadata SET resume_state = NULL WHERE endpoint = ?", (endpoint,) ) self.conn.commit() except sqlite3.Error: pass def calculate_safe_start_date(self, endpoint: str) -> str: """ Calculate a safe start date for an endpoint by checking its last sync, then going back 1 day for safety margin. """ last_sync = self.get_endpoint_last_sync(endpoint) if last_sync: try: last_date = datetime.strptime(last_sync, "%Y-%m-%d") # Go back 1 day for safety safe_date = (last_date - timedelta(days=1)).strftime("%Y-%m-%d") return safe_date except ValueError: pass # Default to 30 days ago if we can't determine default_date = (datetime.now() - timedelta(days=30)).strftime("%Y-%m-%d") return default_date def api_call( self, endpoint: str, params: dict[str, Any] | None = None ) -> dict[str, Any] | None: """Make an API call to ComicVine with rate limiting and retries.""" if params is None: params = {} params["api_key"] = self.api_key params["format"] = "json" url = f"{API_BASE_URL}{endpoint}" for attempt in range(MAX_RETRIES): try: time.sleep(API_DELAY) self.stats["api_calls"] += 1 response = requests.get(url, params=params, headers=HEADERS, timeout=30) # Handle rate limiting if response.status_code == 420: wait_time = 60 * (attempt + 1) self.log(f" Rate limited. Waiting {wait_time} seconds...") time.sleep(wait_time) continue response.raise_for_status() data = response.json() if data.get("status_code") == 100: self.log("ERROR: Invalid API key!", force=True) return None if data.get("status_code") != 1: error = data.get("error", "Unknown error") self.log(f" API error: {error}") return None return data except requests.exceptions.Timeout: self.log(f" Timeout on attempt {attempt + 1}/{MAX_RETRIES}") time.sleep(5) except requests.exceptions.RequestException as e: self.log(f" Request error: {e}") if attempt < MAX_RETRIES - 1: time.sleep(5) return None def sync_publishers(self, start_date: str, end_date: str) -> None: """Sync publishers updated since start_date.""" self.log("\n--- Syncing Publishers ---") if not self.conn: return offset = 0 limit = 100 while True: params = { "field_list": "id,name,image,site_detail_url", "offset": offset, "limit": limit, "filter": f"date_last_updated:{start_date}|{end_date}", "sort": "date_last_updated:asc", } data = self.api_call("publishers/", params) if not data: # If API call failed (after retries), we must stop and NOT update sync date raise RuntimeError("API call failed for publishers") results = data.get("results", []) if not results: break for pub in results: pub_id = pub.get("id") if not pub_id: continue name = pub.get("name") image_url = None if pub.get("image"): image_url = pub["image"].get("original_url") site_url = pub.get("site_detail_url") # Check if exists cur = self.conn.execute( "SELECT id FROM cv_publisher WHERE id = ?", (pub_id,) ) exists = cur.fetchone() is not None self.conn.execute( """INSERT OR REPLACE INTO cv_publisher (id, name, image_url, site_detail_url) VALUES (?, ?, ?, ?)""", (pub_id, name, image_url, site_url), ) if exists: self.stats["publishers_updated"] += 1 else: self.stats["publishers_added"] += 1 self.conn.commit() total = data.get("number_of_total_results", 0) self.log(f" Processed {offset + len(results)}/{total} publishers") if len(results) < limit: break offset += limit # Update metadata after successful completion self.set_endpoint_last_sync("publishers", end_date) def sync_persons(self, start_date: str, end_date: str) -> None: """Sync persons/creators updated since start_date.""" self.log("\n--- Syncing Persons ---") if not self.conn: return offset = 0 limit = 100 while True: params = { "field_list": "id,name", "offset": offset, "limit": limit, "filter": f"date_last_updated:{start_date}|{end_date}", "sort": "date_last_updated:asc", } data = self.api_call("people/", params) if not data: raise RuntimeError("API call failed for people") results = data.get("results", []) if not results: break for person in results: person_id = person.get("id") if not person_id: continue name = person.get("name") # Check if exists cur = self.conn.execute( "SELECT id FROM cv_person WHERE id = ?", (person_id,) ) exists = cur.fetchone() is not None self.conn.execute( "INSERT OR REPLACE INTO cv_person (id, name) VALUES (?, ?)", (person_id, name), ) if exists: self.stats["persons_updated"] += 1 else: self.stats["persons_added"] += 1 self.conn.commit() total = data.get("number_of_total_results", 0) self.log(f" Processed {offset + len(results)}/{total} persons") if len(results) < limit: break offset += limit # Update metadata after successful completion self.set_endpoint_last_sync("people", end_date) def sync_volumes(self, start_date: str, end_date: str) -> None: """Sync volumes updated since start_date.""" self.log("\n--- Syncing Volumes ---") if not self.conn: return offset = 0 limit = 100 while True: params = { "field_list": "id,name,aliases,start_year,publisher,count_of_issues,description,image,site_detail_url", "offset": offset, "limit": limit, "filter": f"date_last_updated:{start_date}|{end_date}", "sort": "date_last_updated:asc", } data = self.api_call("volumes/", params) if not data: raise RuntimeError("API call failed for volumes") results = data.get("results", []) if not results: break for vol in results: vol_id = vol.get("id") if not vol_id: continue name = vol.get("name") aliases = vol.get("aliases") start_year = vol.get("start_year") publisher_id = None if vol.get("publisher"): publisher_id = vol["publisher"].get("id") count_of_issues = vol.get("count_of_issues") description = vol.get("description") image_url = None if vol.get("image"): image_url = vol["image"].get("original_url") site_url = vol.get("site_detail_url") # Check if exists cur = self.conn.execute( "SELECT id FROM cv_volume WHERE id = ?", (vol_id,) ) exists = cur.fetchone() is not None self.conn.execute( """INSERT OR REPLACE INTO cv_volume (id, name, aliases, start_year, publisher_id, count_of_issues, description, image_url, site_detail_url) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( vol_id, name, aliases, start_year, publisher_id, count_of_issues, description, image_url, site_url, ), ) if exists: self.stats["volumes_updated"] += 1 else: self.stats["volumes_added"] += 1 self.conn.commit() total = data.get("number_of_total_results", 0) self.log(f" Processed {offset + len(results)}/{total} volumes") if len(results) < limit: break offset += limit # Update metadata after successful completion self.set_endpoint_last_sync("volumes", end_date) def fetch_issue_details(self, issue_id: int) -> dict[str, Any] | None: """Fetch detailed issue info including credits.""" data = self.api_call(f"issue/4000-{issue_id}/") if data: return data.get("results") return None def sync_issues(self, start_date: str, end_date: str) -> None: """Sync issues updated since start_date.""" self.log("\n--- Syncing Issues ---") if not self.conn: return # Check for resume state resume_state = self.get_resume_state("issues") processed_ids = set() if resume_state and "processed_ids" in resume_state: processed_ids = set(resume_state["processed_ids"]) self.log(f" Resuming from previous run. Skipping {len(processed_ids)} already processed issues.") # Ensure metadata row exists so we can save state # Use start_date as the placeholder last_sync so if we crash, we pick up roughly from here timestamp = datetime.now().isoformat() self.conn.execute( """INSERT INTO cv_sync_metadata (endpoint, last_sync_date, last_sync_timestamp) VALUES (?, ?, ?) ON CONFLICT(endpoint) DO NOTHING""", ("issues", start_date, timestamp), ) self.conn.commit() # First, get list of updated issues issue_ids: list[int] = [] offset = 0 limit = 100 self.log(" Finding updated issues...") while True: params = { "field_list": "id", "offset": offset, "limit": limit, "filter": f"date_last_updated:{start_date}|{end_date}", "sort": "date_last_updated:asc", } data = self.api_call("issues/", params) if not data: raise RuntimeError("API call failed for issues") results = data.get("results", []) if not results: break for issue in results: issue_id = issue.get("id") if issue_id: issue_ids.append(issue_id) total = data.get("number_of_total_results", 0) self.log(f" Found {offset + len(results)}/{total} issues to update") if len(results) < limit: break offset += limit # Filter out already processed issues initial_count = len(issue_ids) issue_ids = [iid for iid in issue_ids if iid not in processed_ids] skipped_count = initial_count - len(issue_ids) if skipped_count > 0: self.log(f" Skipping {skipped_count} issues (already processed). {len(issue_ids)} remaining.") # Now fetch details for each issue self.log(f" Fetching details for {len(issue_ids)} issues...") for i, issue_id in enumerate(issue_ids): issue_data = self.fetch_issue_details(issue_id) if not issue_data: continue # Check if exists cur = self.conn.execute( "SELECT id FROM cv_issue WHERE id = ?", (issue_id,) ) exists = cur.fetchone() is not None # Extract data volume_id = None if issue_data.get("volume"): volume_id = issue_data["volume"].get("id") name = issue_data.get("name") issue_number = issue_data.get("issue_number") cover_date = issue_data.get("cover_date") store_date = issue_data.get("store_date") description = issue_data.get("description") image_url = None if issue_data.get("image"): image_url = issue_data["image"].get("original_url") site_url = issue_data.get("site_detail_url") # Credits as JSON (ensure_ascii=False preserves UTF-8 characters) character_credits = json.dumps(issue_data.get("character_credits") or [], ensure_ascii=False) person_credits = json.dumps(issue_data.get("person_credits") or [], ensure_ascii=False) team_credits = json.dumps(issue_data.get("team_credits") or [], ensure_ascii=False) location_credits = json.dumps(issue_data.get("location_credits") or [], ensure_ascii=False) story_arc_credits = json.dumps(issue_data.get("story_arc_credits") or [], ensure_ascii=False) associated_images = json.dumps(issue_data.get("associated_images") or [], ensure_ascii=False) self.conn.execute( """INSERT OR REPLACE INTO cv_issue (id, volume_id, name, issue_number, cover_date, store_date, description, image_url, site_detail_url, character_credits, person_credits, team_credits, location_credits, story_arc_credits, associated_images) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", ( issue_id, volume_id, name, issue_number, cover_date, store_date, description, image_url, site_url, character_credits, person_credits, team_credits, location_credits, story_arc_credits, associated_images, ), ) if exists: self.stats["issues_updated"] += 1 else: self.stats["issues_added"] += 1 # Print update for every 10th issue in verbose mode, or milestone in non-verbose if self.verbose or (i + 1) % 100 == 0: action = "Updated" if exists else "Added" self.log(f" [{i + 1}/{len(issue_ids)}] {action} issue: {name} (#{issue_number})") # Track progress processed_ids.add(issue_id) # Commit periodically and save state if (i + 1) % 50 == 0: self.save_resume_state("issues", {"processed_ids": list(processed_ids)}) self.conn.commit() self.log(f" --- SAVED RESUME STATE ({i + 1}/{len(issue_ids)}) ---") self.conn.commit() # Clear resume state on successful completion self.clear_resume_state("issues") self.log(f" Completed {len(issue_ids)} issues") # Update metadata after successful completion self.set_endpoint_last_sync("issues", end_date) def recalculate_volume_issue_counts(self) -> None: """ Recalculate count_of_issues for all volumes based on actual issue counts. This is necessary because ComicVine doesn't always update a volume's date_last_updated when new issues are added, so the count_of_issues from the API can become stale. """ self.log("\n--- Recalculating Volume Issue Counts ---") if not self.conn: return try: # Get all volumes that have a mismatch between stored count and actual count cur = self.conn.execute(""" SELECT v.id, v.name, v.count_of_issues as stored_count, COALESCE(i.actual_count, 0) as actual_count FROM cv_volume v LEFT JOIN ( SELECT volume_id, COUNT(*) as actual_count FROM cv_issue GROUP BY volume_id ) i ON v.id = i.volume_id WHERE v.count_of_issues != COALESCE(i.actual_count, 0) OR (v.count_of_issues IS NULL AND i.actual_count > 0) """) mismatches = cur.fetchall() if not mismatches: self.log(" All volume issue counts are accurate") return self.log(f" Found {len(mismatches)} volumes with incorrect counts") # Update each mismatched volume updated = 0 for row in mismatches: vol_id, vol_name, stored, actual = row self.conn.execute( "UPDATE cv_volume SET count_of_issues = ? WHERE id = ?", (actual, vol_id) ) updated += 1 # Log a few examples in verbose mode if self.verbose and updated <= 5: self.log(f" {vol_name}: {stored} -> {actual}") if updated > 5: self.log(f" ... and {updated - 5} more") self.conn.commit() self.log(f" Updated {updated} volume counts") except sqlite3.Error as e: self.log(f" Error recalculating counts: {e}") def rebuild_fts_index(self) -> None: """Rebuild the FTS5 full-text search index.""" self.log("\n--- Rebuilding Search Index ---") if not self.conn: return try: # Check if FTS table exists cur = self.conn.execute( "SELECT name FROM sqlite_master WHERE type='table' AND name='volume_fts'" ) if cur.fetchone(): self.conn.execute("INSERT INTO volume_fts(volume_fts) VALUES('rebuild')") self.conn.commit() self.log(" FTS index rebuilt") else: self.log(" FTS index not found, skipping") except sqlite3.Error as e: self.log(f" Warning: Could not rebuild FTS index: {e}") def print_stats(self) -> None: """Print sync statistics.""" self.log("\n" + "=" * 50, force=True) self.log("SYNC COMPLETE", force=True) self.log("=" * 50, force=True) self.log(f"API calls made: {self.stats['api_calls']}", force=True) self.log(f"Publishers added: {self.stats['publishers_added']}", force=True) self.log(f"Publishers updated: {self.stats['publishers_updated']}", force=True) self.log(f"Persons added: {self.stats['persons_added']}", force=True) self.log(f"Persons updated: {self.stats['persons_updated']}", force=True) self.log(f"Volumes added: {self.stats['volumes_added']}", force=True) self.log(f"Volumes updated: {self.stats['volumes_updated']}", force=True) self.log(f"Issues added: {self.stats['issues_added']}", force=True) self.log(f"Issues updated: {self.stats['issues_updated']}", force=True) def run_sync(self, start_date: str | None = None) -> bool: """Run the full sync process.""" # Ensure metadata table exists self.ensure_metadata_table() # Determine end date (always today) end_date = datetime.now().strftime("%Y-%m-%d") # Sync each endpoint with its own start date # Publishers pub_start = start_date if start_date else self.calculate_safe_start_date("publishers") last_pub = self.get_endpoint_last_sync("publishers") if last_pub: self.log(f"Publishers last synced: {last_pub}") self.log(f"Syncing publishers from {pub_start} to {end_date}") try: self.sync_publishers(pub_start, end_date) except RuntimeError as e: self.log(f"ERROR: {e}. Skipping metadata update for publishers.", force=True) # Persons person_start = start_date if start_date else self.calculate_safe_start_date("people") last_person = self.get_endpoint_last_sync("people") if last_person: self.log(f"People last synced: {last_person}") self.log(f"Syncing people from {person_start} to {end_date}") try: self.sync_persons(person_start, end_date) except RuntimeError as e: self.log(f"ERROR: {e}. Skipping metadata update for people.", force=True) # Volumes vol_start = start_date if start_date else self.calculate_safe_start_date("volumes") last_vol = self.get_endpoint_last_sync("volumes") if last_vol: self.log(f"Volumes last synced: {last_vol}") self.log(f"Syncing volumes from {vol_start} to {end_date}") try: self.sync_volumes(vol_start, end_date) except RuntimeError as e: self.log(f"ERROR: {e}. Skipping metadata update for volumes.", force=True) # Issues issue_start = start_date if start_date else self.calculate_safe_start_date("issues") last_issue = self.get_endpoint_last_sync("issues") if last_issue: self.log(f"Issues last synced: {last_issue}") self.log(f"Syncing issues from {issue_start} to {end_date}") try: self.sync_issues(issue_start, end_date) except RuntimeError as e: self.log(f"ERROR: {e}. Skipping metadata update for issues.", force=True) # Recalculate volume issue counts (API counts can be stale) self.recalculate_volume_issue_counts() # Rebuild search index self.rebuild_fts_index() # Print stats self.print_stats() return True def validate_api_key(api_key: str, verbose: bool = True) -> bool: """Validate the API key by making a test call.""" if verbose: print("Validating API key...") try: response = requests.get( f"{API_BASE_URL}publishers/", params={"api_key": api_key, "format": "json", "limit": 1}, headers=HEADERS, timeout=30, ) data = response.json() if data.get("status_code") == 100: if verbose: print("ERROR: Invalid API key!") return False if data.get("status_code") == 1: if verbose: print("API key validated successfully!") return True if verbose: print(f"Unexpected API response: {data.get('error', 'Unknown')}") return False except requests.exceptions.RequestException as e: if verbose: print(f"ERROR: Could not connect to ComicVine: {e}") return False def find_latest_database(db_dir: Path, today: str) -> Path | None: """Find the most recent database file before today.""" # Look for dated databases pattern = "localcv-*.db" db_files = sorted(db_dir.glob(pattern), reverse=True) today_date = datetime.strptime(today, "%Y-%m-%d").date() for db_file in db_files: # Extract date from filename try: date_str = db_file.stem.replace("localcv-", "") file_date = datetime.strptime(date_str, "%Y-%m-%d").date() # Find the most recent file before today if file_date < today_date: return db_file except ValueError: continue # Fallback to non-dated localcv.db default_db = db_dir / "localcv.db" if default_db.exists(): return default_db return None def create_todays_database(db_dir: Path, today: str, verbose: bool = True) -> Path | None: """Create today's database by copying from the most recent one.""" source_db = find_latest_database(db_dir, today) if not source_db: if verbose: print(f"ERROR: No source database found in {db_dir}", file=sys.stderr) print("Expected format: localcv-YYYY-MM-DD.db or localcv.db", file=sys.stderr) return None target_db = db_dir / f"localcv-{today}.db" # Check if today's database already exists if target_db.exists(): if verbose: print(f"Database for {today} already exists: {target_db}") return target_db # Copy the database if verbose: print(f"Creating {target_db.name} from {source_db.name}...") try: # Copy with progress for large files source_size = source_db.stat().st_size if verbose and source_size > 100 * 1024 * 1024: # > 100MB print(f" Copying {source_size / (1024**3):.2f} GB database...") shutil.copy2(source_db, target_db) # Also copy WAL and SHM files if they exist for ext in ["-wal", "-shm"]: source_aux = Path(str(source_db) + ext) if source_aux.exists(): target_aux = Path(str(target_db) + ext) shutil.copy2(source_aux, target_aux) if verbose: print(f" Database copied successfully") return target_db except (OSError, shutil.Error) as e: if verbose: print(f"ERROR: Failed to copy database: {e}", file=sys.stderr) return None def cleanup_old_databases(db_dir: Path, keep_days: int = 7, verbose: bool = True) -> int: """ Delete old dated database files, keeping only the most recent ones. Args: db_dir: Directory containing the databases keep_days: Number of most recent databases to keep (default: 7) verbose: Print progress messages Returns: Number of databases deleted """ if verbose: print(f"\n--- Cleaning Up Old Databases (keeping last {keep_days} days) ---") # Find all dated database files pattern = "localcv-*.db" db_files = [] for db_file in db_dir.glob(pattern): # Extract date from filename try: date_str = db_file.stem.replace("localcv-", "") file_date = datetime.strptime(date_str, "%Y-%m-%d").date() db_files.append((file_date, db_file)) except ValueError: # Skip files that don't match the date pattern continue if not db_files: if verbose: print(" No dated database files found") return 0 # Sort by date (newest first) db_files.sort(reverse=True) # Keep the most recent ones files_to_keep = db_files[:keep_days] files_to_delete = db_files[keep_days:] if not files_to_delete: if verbose: print(f" Only {len(files_to_keep)} database(s) found, nothing to delete") return 0 # Delete old files deleted_count = 0 total_size_freed = 0 for file_date, db_file in files_to_delete: try: # Get file size before deleting file_size = db_file.stat().st_size # Delete main database file db_file.unlink() deleted_count += 1 total_size_freed += file_size if verbose: size_mb = file_size / (1024**2) print(f" Deleted: {db_file.name} ({size_mb:.1f} MB)") # Also delete associated WAL and SHM files if they exist for ext in ["-wal", "-shm"]: aux_file = Path(str(db_file) + ext) if aux_file.exists(): aux_size = aux_file.stat().st_size aux_file.unlink() total_size_freed += aux_size except OSError as e: if verbose: print(f" Warning: Could not delete {db_file.name}: {e}") if verbose and deleted_count > 0: size_gb = total_size_freed / (1024**3) print(f" Deleted {deleted_count} old database(s), freed {size_gb:.2f} GB") return deleted_count def get_script_dir() -> Path: """Get the directory where the script is located.""" return Path(__file__).parent.resolve() def get_config_file_path() -> Path: """Get path to config file in script directory.""" return get_script_dir() / "comicvine_config.env" def load_config_file() -> dict[str, str]: """Load configuration from .env file in script directory.""" config = {} config_file = get_config_file_path() if not config_file.exists(): return config try: with open(config_file, 'r') as f: for line in f: line = line.strip() # Skip empty lines and comments if not line or line.startswith('#'): continue # Parse KEY=VALUE format if '=' in line: key, value = line.split('=', 1) key = key.strip() value = value.strip() # Remove quotes if present if value.startswith('"') and value.endswith('"'): value = value[1:-1] elif value.startswith("'") and value.endswith("'"): value = value[1:-1] config[key] = value return config except OSError as e: print(f"Warning: Could not read config file: {e}") return config def save_config_file(config: dict[str, str]) -> bool: """Save configuration to .env file in script directory.""" config_file = get_config_file_path() try: with open(config_file, 'w') as f: f.write("# ComicVine Database Updater Configuration\n") f.write("# This file is automatically generated\n\n") # API Key if 'COMICVINE_API_KEY' in config: f.write(f"COMICVINE_API_KEY={config['COMICVINE_API_KEY']}\n\n") # Email settings f.write("# Email Configuration\n") for key in ['EMAIL_TO', 'EMAIL_FROM', 'SMTP_HOST', 'SMTP_PORT', 'SMTP_USER', 'SMTP_PASS']: if key in config: f.write(f"{key}={config[key]}\n") return True except OSError as e: print(f"Warning: Could not save config file: {e}") return False def get_api_key_interactive() -> str | None: """Get API key interactively from user.""" config_file = get_config_file_path() config = load_config_file() # Check for saved key in config if 'COMICVINE_API_KEY' in config: print(f"Found saved API key in {config_file}") use_saved = input("Use saved API key? (Y/n): ").strip().lower() if use_saved in ("", "y", "yes"): return config['COMICVINE_API_KEY'] # Ask user for key print("\n" + "=" * 50) print("ComicVine API Key Required") print("=" * 50) print("To use this tool, you need a ComicVine API key.") print("Get one free at: https://comicvine.gamespot.com/api/") print() api_key = input("Enter your ComicVine API key: ").strip() if not api_key: return None # Offer to save the key save_key = input("Save this API key for future use? (y/N): ").strip().lower() if save_key in ("y", "yes"): config['COMICVINE_API_KEY'] = api_key if save_config_file(config): print(f"API key saved to {config_file}") return api_key def get_database_path_gui() -> Path | None: """Get database path using GUI file picker.""" if not HAS_GUI: return None try: root = tk.Tk() root.withdraw() # Hide the main window root.attributes('-topmost', True) # Bring dialog to front file_path = filedialog.askopenfilename( title="Select ComicVine SQLite Database", filetypes=[("SQLite Database", "*.db"), ("All Files", "*.*")], initialfile="localcv.db" ) root.destroy() if file_path: return Path(file_path) return None except Exception as e: print(f"GUI error: {e}") return None def get_database_path_interactive() -> Path | None: """Get database path interactively.""" # Look for default file in current directory default_path = Path("localcv.db") if default_path.exists(): print(f"Found database: {default_path}") use_default = input("Use this database? (Y/n): ").strip().lower() if use_default in ("", "y", "yes"): return default_path # Try GUI file picker if HAS_GUI: print("\nOpening file picker...") path = get_database_path_gui() if path: print(f"Selected: {path}") return path print("No file selected.") # Fallback to manual entry print("\nEnter database path manually:") while True: path_str = input("Path to localcv.db (or press Enter to cancel): ").strip() if not path_str: return None path = Path(path_str) if path.exists(): return path print(f"ERROR: File not found: {path}") def parse_arguments() -> argparse.Namespace: """Parse command line arguments.""" parser = argparse.ArgumentParser( description="ComicVine SQLite Database Updater (Dated Version)", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" This version creates dated database copies: localcv-YYYY-MM-DD.db Examples: # Interactive mode %(prog)s %(prog)s /path/to/localcv.db # Non-interactive mode - creates today's DB from yesterday's %(prog)s --non-interactive --db-dir /path/to/db/directory # With email notifications %(prog)s --non-interactive --db-dir /path/to/db --email-to your@email.com # Configuration file (comicvine_config.env in script directory) # The script will create this file when you save settings interactively # You can also create it manually with these settings: COMICVINE_API_KEY=your_api_key_here EMAIL_TO=your@email.com EMAIL_FROM=comicvine@yourdomain.com SMTP_HOST=mail.smtp2go.com SMTP_PORT=2525 SMTP_USER=your_smtp2go_username SMTP_PASS=your_smtp2go_password %(prog)s --non-interactive --db-dir /path/to/db/directory # Crontab entry (runs daily at 3 AM): 0 3 * * * /usr/bin/python3 /path/to/sqlite_cv_updater.py --non-interactive --db-dir /path/to/db >> /var/log/comicvine_sync.log 2>&1 How it works: 1. Looks for most recent localcv-YYYY-MM-DD.db before today 2. Copies it to localcv-{today}.db 3. Updates the new database with today's ComicVine data 4. Optionally sends email report with sync results """ ) parser.add_argument( "database", nargs="?", help="Path to a specific database file (interactive mode)" ) parser.add_argument( "--db", dest="db_path", help="Path to a specific database file (alternative to positional arg)" ) parser.add_argument( "--db-dir", dest="db_dir", help="Directory containing dated databases (for non-interactive mode)" ) parser.add_argument( "--api-key", help="ComicVine API key (or set COMICVINE_API_KEY environment variable)" ) parser.add_argument( "--start-date", help="Custom start date in YYYY-MM-DD format (default: automatic based on last sync)" ) parser.add_argument( "--non-interactive", action="store_true", help="Run without any user prompts (required for cron jobs)" ) parser.add_argument( "--quiet", action="store_true", help="Reduce output verbosity (only show errors and final stats)" ) parser.add_argument( "--no-copy", action="store_true", help="Don't create dated copy, update database in place" ) parser.add_argument( "--keep-days", type=int, default=7, help="Number of dated databases to keep (default: 7, set to 0 to disable cleanup)" ) # Email configuration parser.add_argument( "--email-to", help="Email address to send sync report to (or set EMAIL_TO environment variable)" ) parser.add_argument( "--email-from", help="Email address to send from (or set EMAIL_FROM environment variable)" ) parser.add_argument( "--smtp-host", help="SMTP server hostname (default: mail.smtp2go.com, or set SMTP_HOST)" ) parser.add_argument( "--smtp-port", type=int, help="SMTP server port (default: 2525, or set SMTP_PORT)" ) parser.add_argument( "--smtp-user", help="SMTP username (or set SMTP_USER environment variable)" ) parser.add_argument( "--smtp-pass", help="SMTP password (or set SMTP_PASS environment variable)" ) parser.add_argument( "--smtp-tls", action="store_true", help="Use STARTTLS for SMTP connection (default for port 587)" ) parser.add_argument( "--smtp-ssl", action="store_true", help="Use SSL for SMTP connection (default for port 465)" ) return parser.parse_args() def send_email_report( email_to: str, email_from: str, subject: str, log_contents: str, stats: dict[str, int], duration: str, smtp_host: str, smtp_port: int, smtp_user: str | None = None, smtp_pass: str | None = None, use_tls: bool = False, use_ssl: bool = False, ) -> bool: """Send email report with sync results.""" try: # Create message msg = MIMEMultipart('alternative') msg['Subject'] = subject msg['From'] = email_from msg['To'] = email_to # Create plain text version text_body = """ ComicVine Database Sync Report {separator} Duration: {duration} Statistics: ----------- API calls made: {api_calls} Publishers added: {publishers_added} Publishers updated: {publishers_updated} Persons added: {persons_added} Persons updated: {persons_updated} Volumes added: {volumes_added} Volumes updated: {volumes_updated} Issues added: {issues_added} Issues updated: {issues_updated} Full Log: {separator} {log_contents} """.format( separator='=' * 50, duration=duration, api_calls=stats['api_calls'], publishers_added=stats['publishers_added'], publishers_updated=stats['publishers_updated'], persons_added=stats['persons_added'], persons_updated=stats['persons_updated'], volumes_added=stats['volumes_added'], volumes_updated=stats['volumes_updated'], issues_added=stats['issues_added'], issues_updated=stats['issues_updated'], log_contents=log_contents ) # Create HTML version html_body = """