331 lines
12 KiB
Python
Executable File
331 lines
12 KiB
Python
Executable File
#!/usr/bin/env python3
|
||
"""
|
||
compose_check_updates.py
|
||
|
||
Checks running Docker Compose containers for tag updates WITHOUT pulling.
|
||
Optionally updates & restarts services listed in a file named 'images'.
|
||
|
||
- Update list: file with one name per line (container name like 'proj-web-1' OR service name like 'web').
|
||
- Update action (if listed and update exists): `docker compose pull <service>` then `docker compose up -d <service>`
|
||
executed in the Compose project working directory (derived from labels when possible).
|
||
|
||
Install:
|
||
pip install docker requests
|
||
|
||
Usage:
|
||
# Just check
|
||
./compose_check_updates.py
|
||
|
||
# Check + update services/containers listed in ./images
|
||
./compose_check_updates.py --images-file images
|
||
|
||
# If working directory can’t be auto-detected from labels, specify fallback:
|
||
./compose_check_updates.py --images-file images --project-dir /path/to/compose
|
||
"""
|
||
|
||
import argparse
|
||
import re
|
||
import subprocess
|
||
import hashlib
|
||
import sys
|
||
from pathlib import Path
|
||
from typing import Optional, Tuple, Dict, Set
|
||
|
||
import requests
|
||
import docker
|
||
|
||
# --- HTTP/registry setup (for "check without pull") ---
|
||
|
||
DEFAULT_REGISTRY = "registry-1.docker.io"
|
||
DEFAULT_NAMESPACE = "library"
|
||
SESSION = requests.Session()
|
||
SESSION.headers.update({"User-Agent": "compose-update-checker/1.0", "Accept": "*/*"})
|
||
|
||
MANIFEST_ACCEPT = (
|
||
"application/vnd.docker.distribution.manifest.v2+json,"
|
||
"application/vnd.oci.image.manifest.v1+json,"
|
||
"application/vnd.docker.distribution.manifest.list.v2+json,"
|
||
"application/vnd.oci.image.index.v1+json"
|
||
)
|
||
|
||
|
||
# ---------- helpers: image reference parsing ----------
|
||
|
||
def normalize_image(ref: str) -> Tuple[str, str, str]:
|
||
"""
|
||
Normalize an image reference into (registry, repository, tag_or_digest).
|
||
- Supports 'nginx', 'nginx:1.25', 'docker.io/library/nginx:latest',
|
||
'ghcr.io/owner/repo:tag', 'registry:5000/ns/repo@sha256:...'
|
||
- Defaults to :latest if no tag/digest given.
|
||
"""
|
||
if "@" in ref:
|
||
name, digest = ref.split("@", 1)
|
||
tag = "@" + digest
|
||
else:
|
||
if ":" in ref and "/" in ref.split(":")[0]:
|
||
# host[:port]/repo[:tag] OR host[:port]/repo
|
||
name, *maybe_tag = ref.rsplit(":", 1)
|
||
if maybe_tag and "/" not in maybe_tag[0]:
|
||
tag = maybe_tag[0]
|
||
else:
|
||
name = ref
|
||
tag = "latest"
|
||
elif ":" in ref:
|
||
name, tag = ref.rsplit(":", 1)
|
||
else:
|
||
name = ref
|
||
tag = "latest"
|
||
|
||
parts = name.split("/")
|
||
if len(parts) == 1:
|
||
registry = DEFAULT_REGISTRY
|
||
repository = f"{DEFAULT_NAMESPACE}/{parts[0]}"
|
||
else:
|
||
if "." in parts[0] or ":" in parts[0] or parts[0] == "localhost":
|
||
registry = parts[0]
|
||
repository = "/".join(parts[1:])
|
||
else:
|
||
registry = DEFAULT_REGISTRY
|
||
repository = "/".join(parts)
|
||
|
||
if registry in ("docker.io", "index.docker.io"):
|
||
registry = DEFAULT_REGISTRY
|
||
|
||
return registry, repository, tag
|
||
|
||
|
||
def bearer_token(auth_header: str) -> Optional[str]:
|
||
m = re.match(
|
||
r'Bearer realm="(?P<realm>[^"]+)"(?:,service="(?P<service>[^"]+)")?(?:,scope="(?P<scope>[^"]+)")?',
|
||
auth_header,
|
||
)
|
||
if not m:
|
||
return None
|
||
realm = m.group("realm")
|
||
q = {}
|
||
if m.group("service"):
|
||
q["service"] = m.group("service")
|
||
if m.group("scope"):
|
||
q["scope"] = m.group("scope")
|
||
r = SESSION.get(realm, params=q, timeout=20)
|
||
r.raise_for_status()
|
||
return r.json().get("token")
|
||
|
||
|
||
def remote_digest(registry: str, repository: str, tag_or_digest: str) -> Optional[str]:
|
||
"""
|
||
Return the remote digest for repo:tag WITHOUT pulling.
|
||
Handles single-arch manifests and multi-arch manifest lists (OCI index).
|
||
Strategy:
|
||
- HEAD first (lightweight, usually returns Docker-Content-Digest)
|
||
- If 401, fetch Bearer token then retry
|
||
- If still no header, GET and use header if present
|
||
- As a last resort, sha256 of the exact response body bytes
|
||
"""
|
||
if tag_or_digest.startswith("@sha256:"):
|
||
return tag_or_digest[1:]
|
||
|
||
url = f"https://{registry}/v2/{repository}/manifests/{tag_or_digest}"
|
||
headers = {"Accept": MANIFEST_ACCEPT}
|
||
|
||
def do(req_method):
|
||
r = req_method(url, headers=headers, timeout=20)
|
||
if r.status_code == 401 and "WWW-Authenticate" in r.headers:
|
||
token = bearer_token(r.headers["WWW-Authenticate"])
|
||
if token:
|
||
headers["Authorization"] = f"Bearer {token}"
|
||
r = req_method(url, headers=headers, timeout=20)
|
||
return r
|
||
|
||
# Try HEAD
|
||
r = do(SESSION.head)
|
||
if r.status_code == 404:
|
||
return None
|
||
r.raise_for_status()
|
||
digest = r.headers.get("Docker-Content-Digest")
|
||
if digest:
|
||
return digest
|
||
|
||
# Fall back to GET
|
||
r = do(SESSION.get)
|
||
if r.status_code == 404:
|
||
return None
|
||
r.raise_for_status()
|
||
|
||
digest = r.headers.get("Docker-Content-Digest")
|
||
if digest:
|
||
return digest
|
||
|
||
# Last resort: compute sha256 of the exact payload (works for v2/OCI content)
|
||
# Note: Some proxies might reserialize; this still provides a consistent signal for comparison runs.
|
||
body = r.content or b""
|
||
if body:
|
||
return "sha256:" + hashlib.sha256(body).hexdigest()
|
||
|
||
return None
|
||
|
||
# ---------- local docker helpers ----------
|
||
|
||
def local_repo_digest(client, image_ref: str) -> Optional[str]:
|
||
"""
|
||
Try to find a local RepoDigest for the given image tag/ref.
|
||
Returns 'sha256:...' or None.
|
||
"""
|
||
try:
|
||
img = client.images.get(image_ref)
|
||
except docker.errors.ImageNotFound:
|
||
return None
|
||
|
||
repo_digests = img.attrs.get("RepoDigests") or []
|
||
for d in repo_digests:
|
||
if "@sha256:" in d:
|
||
return d.split("@", 1)[1]
|
||
return None
|
||
|
||
|
||
def read_allowlist(path: Optional[str]) -> Set[str]:
|
||
if not path:
|
||
return set()
|
||
p = Path(path)
|
||
if not p.exists():
|
||
print(f"WARNING: images file '{path}' not found; continuing in check-only mode.", file=sys.stderr)
|
||
return set()
|
||
out = set()
|
||
for line in p.read_text().splitlines():
|
||
name = line.strip()
|
||
if name and not name.startswith("#"):
|
||
out.add(name)
|
||
return out
|
||
|
||
|
||
def run_compose(cmd: list[str], cwd: Optional[Path]) -> subprocess.CompletedProcess:
|
||
return subprocess.run(cmd, cwd=str(cwd) if cwd else None, text=True, capture_output=True)
|
||
|
||
def main():
|
||
ap = argparse.ArgumentParser(description="Check Compose containers for image updates without pulling.")
|
||
ap.add_argument("--images-file", default=None, help="Path to file listing container or service names to update.")
|
||
ap.add_argument("--project-dir", default=None, help="Fallback compose project dir if label not available.")
|
||
ap.add_argument("--dry-run", action="store_true", help="Show update actions but do not change anything.")
|
||
args = ap.parse_args()
|
||
|
||
allow = read_allowlist(args.images_file)
|
||
fallback_dir = Path(args.project_dir).resolve() if args.project_dir else None
|
||
|
||
try:
|
||
client = docker.from_env()
|
||
except Exception as e:
|
||
print(f"Failed to connect to Docker daemon: {e}", file=sys.stderr)
|
||
sys.exit(2)
|
||
|
||
containers = client.containers.list() # running only
|
||
if not containers:
|
||
print("No running containers.")
|
||
return
|
||
|
||
print(f"{'CONTAINER':28} {'SERVICE':16} {'IMAGE':46} {'STATUS':18} DETAILS")
|
||
|
||
# We’ll group info and remember compose project metadata per container
|
||
updates_to_apply: Dict[str, dict] = {}
|
||
|
||
for c in containers:
|
||
labels = c.labels or {}
|
||
project = labels.get("com.docker.compose.project")
|
||
service = labels.get("com.docker.compose.service")
|
||
# working dir & config files are present in newer compose versions; use if available
|
||
proj_workdir = labels.get("com.docker.compose.project.working_dir")
|
||
proj_config_files = labels.get("com.docker.compose.project.config_files") # comma-separated
|
||
compose_dir = Path(proj_workdir).resolve() if proj_workdir else fallback_dir
|
||
|
||
# best-effort image ref for lookup
|
||
image_ref = (c.image.tags[0] if c.image.tags else (c.image.attrs.get("RepoDigests", ["<untagged>"])[0]))
|
||
|
||
# compute local digest (None for locally-built/unpinned)
|
||
local_d = local_repo_digest(client, image_ref)
|
||
if local_d is None:
|
||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} no local digest (locally built/unpinned)")
|
||
continue
|
||
|
||
# compute remote digest from registry manifest (no pull)
|
||
reg, repo, tag = normalize_image(image_ref)
|
||
try:
|
||
remote_d = remote_digest(reg, repo, tag)
|
||
except requests.HTTPError as he:
|
||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} registry error: {he}")
|
||
continue
|
||
except requests.RequestException as rexc:
|
||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} network error: {rexc}")
|
||
continue
|
||
|
||
if not remote_d:
|
||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} could not obtain remote digest")
|
||
continue
|
||
|
||
if local_d == remote_d:
|
||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UP-TO-DATE':18}")
|
||
else:
|
||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UPDATE AVAILABLE':18} local={local_d[:18]}… remote={remote_d[:18]}…")
|
||
|
||
# Should we update this one?
|
||
listed = (
|
||
(c.name in allow) or
|
||
(service in allow if service else False)
|
||
)
|
||
if listed and service and project:
|
||
updates_to_apply[c.name] = {
|
||
"service": service,
|
||
"project": project,
|
||
"compose_dir": compose_dir,
|
||
"config_files": proj_config_files.split(",") if proj_config_files else None,
|
||
}
|
||
|
||
if not updates_to_apply:
|
||
if allow:
|
||
print("\nNo listed containers/services require updates (or labels/project info not found).")
|
||
return
|
||
|
||
# Apply updates via docker compose for each affected service
|
||
print("\nPlanned updates:")
|
||
for name, meta in updates_to_apply.items():
|
||
svc = meta["service"]
|
||
cdir = meta["compose_dir"] or Path.cwd()
|
||
cfgs = meta["config_files"]
|
||
|
||
# Build -f args; resolve relative files against project dir
|
||
cfg_args: list[str] = []
|
||
if cfgs:
|
||
for f in cfgs:
|
||
p = Path(f)
|
||
if not p.is_absolute():
|
||
p = (cdir / p).resolve()
|
||
cfg_args += ["-f", str(p)]
|
||
|
||
base = ["docker", "compose",
|
||
"--project-directory", str(cdir)]
|
||
cmd_pull = [*base, *cfg_args, "pull", svc]
|
||
cmd_up = [*base, *cfg_args, "up", "-d", "--no-deps", svc]
|
||
|
||
print(f" - {name}: (service '{svc}')")
|
||
if args.dry_run:
|
||
print(f" would run: {' '.join(cmd_pull)} (cwd={cdir})")
|
||
print(f" would run: {' '.join(cmd_up)} (cwd={cdir})")
|
||
continue
|
||
|
||
res = run_compose(cmd_pull, cdir)
|
||
if res.returncode != 0:
|
||
print(f" pull failed: {res.stderr.strip() or res.stdout.strip()}")
|
||
continue
|
||
else:
|
||
print(f" pull ok: {res.stdout.strip().splitlines()[-1] if res.stdout else 'done'}")
|
||
|
||
res2 = run_compose(cmd_up, cdir)
|
||
if res2.returncode != 0:
|
||
print(f" up failed: {res2.stderr.strip() or res2.stdout.strip()}")
|
||
else:
|
||
print(f" up ok: {res2.stdout.strip().splitlines()[-1] if res2.stdout else 'recreated'}")
|
||
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|
||
|