#!/usr/bin/env python3 """ compose_check_updates.py Checks running Docker Compose containers for tag updates WITHOUT pulling. Optionally updates & restarts services listed in a file named 'images'. - Update list: file with one name per line (container name like 'proj-web-1' OR service name like 'web'). - Update action (if listed and update exists): `docker compose pull ` then `docker compose up -d ` executed in the Compose project working directory (derived from labels when possible). Install: pip install docker requests Usage: # Just check ./compose_check_updates.py # Check + update services/containers listed in ./images ./compose_check_updates.py --images-file images # If working directory can’t be auto-detected from labels, specify fallback: ./compose_check_updates.py --images-file images --project-dir /path/to/compose """ import argparse import re import subprocess import hashlib import sys from pathlib import Path from typing import Optional, Tuple, Dict, Set import requests import docker # --- HTTP/registry setup (for "check without pull") --- DEFAULT_REGISTRY = "registry-1.docker.io" DEFAULT_NAMESPACE = "library" SESSION = requests.Session() SESSION.headers.update({"User-Agent": "compose-update-checker/1.0", "Accept": "*/*"}) MANIFEST_ACCEPT = ( "application/vnd.docker.distribution.manifest.v2+json," "application/vnd.oci.image.manifest.v1+json," "application/vnd.docker.distribution.manifest.list.v2+json," "application/vnd.oci.image.index.v1+json" ) # ---------- helpers: image reference parsing ---------- def normalize_image(ref: str) -> Tuple[str, str, str]: """ Normalize an image reference into (registry, repository, tag_or_digest). - Supports 'nginx', 'nginx:1.25', 'docker.io/library/nginx:latest', 'ghcr.io/owner/repo:tag', 'registry:5000/ns/repo@sha256:...' - Defaults to :latest if no tag/digest given. """ if "@" in ref: name, digest = ref.split("@", 1) tag = "@" + digest else: if ":" in ref and "/" in ref.split(":")[0]: # host[:port]/repo[:tag] OR host[:port]/repo name, *maybe_tag = ref.rsplit(":", 1) if maybe_tag and "/" not in maybe_tag[0]: tag = maybe_tag[0] else: name = ref tag = "latest" elif ":" in ref: name, tag = ref.rsplit(":", 1) else: name = ref tag = "latest" parts = name.split("/") if len(parts) == 1: registry = DEFAULT_REGISTRY repository = f"{DEFAULT_NAMESPACE}/{parts[0]}" else: if "." in parts[0] or ":" in parts[0] or parts[0] == "localhost": registry = parts[0] repository = "/".join(parts[1:]) else: registry = DEFAULT_REGISTRY repository = "/".join(parts) if registry in ("docker.io", "index.docker.io"): registry = DEFAULT_REGISTRY return registry, repository, tag def bearer_token(auth_header: str) -> Optional[str]: m = re.match( r'Bearer realm="(?P[^"]+)"(?:,service="(?P[^"]+)")?(?:,scope="(?P[^"]+)")?', auth_header, ) if not m: return None realm = m.group("realm") q = {} if m.group("service"): q["service"] = m.group("service") if m.group("scope"): q["scope"] = m.group("scope") r = SESSION.get(realm, params=q, timeout=20) r.raise_for_status() return r.json().get("token") def remote_digest(registry: str, repository: str, tag_or_digest: str) -> Optional[str]: """ Return the remote digest for repo:tag WITHOUT pulling. Handles single-arch manifests and multi-arch manifest lists (OCI index). Strategy: - HEAD first (lightweight, usually returns Docker-Content-Digest) - If 401, fetch Bearer token then retry - If still no header, GET and use header if present - As a last resort, sha256 of the exact response body bytes """ if tag_or_digest.startswith("@sha256:"): return tag_or_digest[1:] url = f"https://{registry}/v2/{repository}/manifests/{tag_or_digest}" headers = {"Accept": MANIFEST_ACCEPT} def do(req_method): r = req_method(url, headers=headers, timeout=20) if r.status_code == 401 and "WWW-Authenticate" in r.headers: token = bearer_token(r.headers["WWW-Authenticate"]) if token: headers["Authorization"] = f"Bearer {token}" r = req_method(url, headers=headers, timeout=20) return r # Try HEAD r = do(SESSION.head) if r.status_code == 404: return None r.raise_for_status() digest = r.headers.get("Docker-Content-Digest") if digest: return digest # Fall back to GET r = do(SESSION.get) if r.status_code == 404: return None r.raise_for_status() digest = r.headers.get("Docker-Content-Digest") if digest: return digest # Last resort: compute sha256 of the exact payload (works for v2/OCI content) # Note: Some proxies might reserialize; this still provides a consistent signal for comparison runs. body = r.content or b"" if body: return "sha256:" + hashlib.sha256(body).hexdigest() return None # ---------- local docker helpers ---------- def local_repo_digest(client, image_ref: str) -> Optional[str]: """ Try to find a local RepoDigest for the given image tag/ref. Returns 'sha256:...' or None. """ try: img = client.images.get(image_ref) except docker.errors.ImageNotFound: return None repo_digests = img.attrs.get("RepoDigests") or [] for d in repo_digests: if "@sha256:" in d: return d.split("@", 1)[1] return None def read_allowlist(path: Optional[str]) -> Set[str]: if not path: return set() p = Path(path) if not p.exists(): print(f"WARNING: images file '{path}' not found; continuing in check-only mode.", file=sys.stderr) return set() out = set() for line in p.read_text().splitlines(): name = line.strip() if name and not name.startswith("#"): out.add(name) return out def run_compose(cmd: list[str], cwd: Optional[Path]) -> subprocess.CompletedProcess: return subprocess.run(cmd, cwd=str(cwd) if cwd else None, text=True, capture_output=True) def main(): ap = argparse.ArgumentParser(description="Check Compose containers for image updates without pulling.") ap.add_argument("--images-file", default=None, help="Path to file listing container or service names to update.") ap.add_argument("--project-dir", default=None, help="Fallback compose project dir if label not available.") ap.add_argument("--dry-run", action="store_true", help="Show update actions but do not change anything.") args = ap.parse_args() allow = read_allowlist(args.images_file) fallback_dir = Path(args.project_dir).resolve() if args.project_dir else None try: client = docker.from_env() except Exception as e: print(f"Failed to connect to Docker daemon: {e}", file=sys.stderr) sys.exit(2) containers = client.containers.list() # running only if not containers: print("No running containers.") return print(f"{'CONTAINER':28} {'SERVICE':16} {'IMAGE':46} {'STATUS':18} DETAILS") # We’ll group info and remember compose project metadata per container updates_to_apply: Dict[str, dict] = {} for c in containers: labels = c.labels or {} project = labels.get("com.docker.compose.project") service = labels.get("com.docker.compose.service") # working dir & config files are present in newer compose versions; use if available proj_workdir = labels.get("com.docker.compose.project.working_dir") proj_config_files = labels.get("com.docker.compose.project.config_files") # comma-separated compose_dir = Path(proj_workdir).resolve() if proj_workdir else fallback_dir # best-effort image ref for lookup image_ref = (c.image.tags[0] if c.image.tags else (c.image.attrs.get("RepoDigests", [""])[0])) # compute local digest (None for locally-built/unpinned) local_d = local_repo_digest(client, image_ref) if local_d is None: print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} no local digest (locally built/unpinned)") continue # compute remote digest from registry manifest (no pull) reg, repo, tag = normalize_image(image_ref) try: remote_d = remote_digest(reg, repo, tag) except requests.HTTPError as he: print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} registry error: {he}") continue except requests.RequestException as rexc: print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} network error: {rexc}") continue if not remote_d: print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} could not obtain remote digest") continue if local_d == remote_d: print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UP-TO-DATE':18}") else: print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UPDATE AVAILABLE':18} local={local_d[:18]}… remote={remote_d[:18]}…") # Should we update this one? listed = ( (c.name in allow) or (service in allow if service else False) ) if listed and service and project: updates_to_apply[c.name] = { "service": service, "project": project, "compose_dir": compose_dir, "config_files": proj_config_files.split(",") if proj_config_files else None, } if not updates_to_apply: if allow: print("\nNo listed containers/services require updates (or labels/project info not found).") return # Apply updates via docker compose for each affected service print("\nPlanned updates:") for name, meta in updates_to_apply.items(): svc = meta["service"] cdir = meta["compose_dir"] or Path.cwd() cfgs = meta["config_files"] # Build -f args; resolve relative files against project dir cfg_args: list[str] = [] if cfgs: for f in cfgs: p = Path(f) if not p.is_absolute(): p = (cdir / p).resolve() cfg_args += ["-f", str(p)] base = ["docker", "compose", "--project-directory", str(cdir)] cmd_pull = [*base, *cfg_args, "pull", svc] cmd_up = [*base, *cfg_args, "up", "-d", "--no-deps", svc] print(f" - {name}: (service '{svc}')") if args.dry_run: print(f" would run: {' '.join(cmd_pull)} (cwd={cdir})") print(f" would run: {' '.join(cmd_up)} (cwd={cdir})") continue res = run_compose(cmd_pull, cdir) if res.returncode != 0: print(f" pull failed: {res.stderr.strip() or res.stdout.strip()}") continue else: print(f" pull ok: {res.stdout.strip().splitlines()[-1] if res.stdout else 'done'}") res2 = run_compose(cmd_up, cdir) if res2.returncode != 0: print(f" up failed: {res2.stderr.strip() or res2.stdout.strip()}") else: print(f" up ok: {res2.stdout.strip().splitlines()[-1] if res2.stdout else 'recreated'}") if __name__ == "__main__": main()