Files
Scripts/update_docker.py

331 lines
12 KiB
Python
Executable File
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
compose_check_updates.py
Checks running Docker Compose containers for tag updates WITHOUT pulling.
Optionally updates & restarts services listed in a file named 'images'.
- Update list: file with one name per line (container name like 'proj-web-1' OR service name like 'web').
- Update action (if listed and update exists): `docker compose pull <service>` then `docker compose up -d <service>`
executed in the Compose project working directory (derived from labels when possible).
Install:
pip install docker requests
Usage:
# Just check
./compose_check_updates.py
# Check + update services/containers listed in ./images
./compose_check_updates.py --images-file images
# If working directory cant be auto-detected from labels, specify fallback:
./compose_check_updates.py --images-file images --project-dir /path/to/compose
"""
import argparse
import re
import subprocess
import hashlib
import sys
from pathlib import Path
from typing import Optional, Tuple, Dict, Set
import requests
import docker
# --- HTTP/registry setup (for "check without pull") ---
DEFAULT_REGISTRY = "registry-1.docker.io"
DEFAULT_NAMESPACE = "library"
SESSION = requests.Session()
SESSION.headers.update({"User-Agent": "compose-update-checker/1.0", "Accept": "*/*"})
MANIFEST_ACCEPT = (
"application/vnd.docker.distribution.manifest.v2+json,"
"application/vnd.oci.image.manifest.v1+json,"
"application/vnd.docker.distribution.manifest.list.v2+json,"
"application/vnd.oci.image.index.v1+json"
)
# ---------- helpers: image reference parsing ----------
def normalize_image(ref: str) -> Tuple[str, str, str]:
"""
Normalize an image reference into (registry, repository, tag_or_digest).
- Supports 'nginx', 'nginx:1.25', 'docker.io/library/nginx:latest',
'ghcr.io/owner/repo:tag', 'registry:5000/ns/repo@sha256:...'
- Defaults to :latest if no tag/digest given.
"""
if "@" in ref:
name, digest = ref.split("@", 1)
tag = "@" + digest
else:
if ":" in ref and "/" in ref.split(":")[0]:
# host[:port]/repo[:tag] OR host[:port]/repo
name, *maybe_tag = ref.rsplit(":", 1)
if maybe_tag and "/" not in maybe_tag[0]:
tag = maybe_tag[0]
else:
name = ref
tag = "latest"
elif ":" in ref:
name, tag = ref.rsplit(":", 1)
else:
name = ref
tag = "latest"
parts = name.split("/")
if len(parts) == 1:
registry = DEFAULT_REGISTRY
repository = f"{DEFAULT_NAMESPACE}/{parts[0]}"
else:
if "." in parts[0] or ":" in parts[0] or parts[0] == "localhost":
registry = parts[0]
repository = "/".join(parts[1:])
else:
registry = DEFAULT_REGISTRY
repository = "/".join(parts)
if registry in ("docker.io", "index.docker.io"):
registry = DEFAULT_REGISTRY
return registry, repository, tag
def bearer_token(auth_header: str) -> Optional[str]:
m = re.match(
r'Bearer realm="(?P<realm>[^"]+)"(?:,service="(?P<service>[^"]+)")?(?:,scope="(?P<scope>[^"]+)")?',
auth_header,
)
if not m:
return None
realm = m.group("realm")
q = {}
if m.group("service"):
q["service"] = m.group("service")
if m.group("scope"):
q["scope"] = m.group("scope")
r = SESSION.get(realm, params=q, timeout=20)
r.raise_for_status()
return r.json().get("token")
def remote_digest(registry: str, repository: str, tag_or_digest: str) -> Optional[str]:
"""
Return the remote digest for repo:tag WITHOUT pulling.
Handles single-arch manifests and multi-arch manifest lists (OCI index).
Strategy:
- HEAD first (lightweight, usually returns Docker-Content-Digest)
- If 401, fetch Bearer token then retry
- If still no header, GET and use header if present
- As a last resort, sha256 of the exact response body bytes
"""
if tag_or_digest.startswith("@sha256:"):
return tag_or_digest[1:]
url = f"https://{registry}/v2/{repository}/manifests/{tag_or_digest}"
headers = {"Accept": MANIFEST_ACCEPT}
def do(req_method):
r = req_method(url, headers=headers, timeout=20)
if r.status_code == 401 and "WWW-Authenticate" in r.headers:
token = bearer_token(r.headers["WWW-Authenticate"])
if token:
headers["Authorization"] = f"Bearer {token}"
r = req_method(url, headers=headers, timeout=20)
return r
# Try HEAD
r = do(SESSION.head)
if r.status_code == 404:
return None
r.raise_for_status()
digest = r.headers.get("Docker-Content-Digest")
if digest:
return digest
# Fall back to GET
r = do(SESSION.get)
if r.status_code == 404:
return None
r.raise_for_status()
digest = r.headers.get("Docker-Content-Digest")
if digest:
return digest
# Last resort: compute sha256 of the exact payload (works for v2/OCI content)
# Note: Some proxies might reserialize; this still provides a consistent signal for comparison runs.
body = r.content or b""
if body:
return "sha256:" + hashlib.sha256(body).hexdigest()
return None
# ---------- local docker helpers ----------
def local_repo_digest(client, image_ref: str) -> Optional[str]:
"""
Try to find a local RepoDigest for the given image tag/ref.
Returns 'sha256:...' or None.
"""
try:
img = client.images.get(image_ref)
except docker.errors.ImageNotFound:
return None
repo_digests = img.attrs.get("RepoDigests") or []
for d in repo_digests:
if "@sha256:" in d:
return d.split("@", 1)[1]
return None
def read_allowlist(path: Optional[str]) -> Set[str]:
if not path:
return set()
p = Path(path)
if not p.exists():
print(f"WARNING: images file '{path}' not found; continuing in check-only mode.", file=sys.stderr)
return set()
out = set()
for line in p.read_text().splitlines():
name = line.strip()
if name and not name.startswith("#"):
out.add(name)
return out
def run_compose(cmd: list[str], cwd: Optional[Path]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, cwd=str(cwd) if cwd else None, text=True, capture_output=True)
def main():
ap = argparse.ArgumentParser(description="Check Compose containers for image updates without pulling.")
ap.add_argument("--images-file", default=None, help="Path to file listing container or service names to update.")
ap.add_argument("--project-dir", default=None, help="Fallback compose project dir if label not available.")
ap.add_argument("--dry-run", action="store_true", help="Show update actions but do not change anything.")
args = ap.parse_args()
allow = read_allowlist(args.images_file)
fallback_dir = Path(args.project_dir).resolve() if args.project_dir else None
try:
client = docker.from_env()
except Exception as e:
print(f"Failed to connect to Docker daemon: {e}", file=sys.stderr)
sys.exit(2)
containers = client.containers.list() # running only
if not containers:
print("No running containers.")
return
print(f"{'CONTAINER':28} {'SERVICE':16} {'IMAGE':46} {'STATUS':18} DETAILS")
# Well group info and remember compose project metadata per container
updates_to_apply: Dict[str, dict] = {}
for c in containers:
labels = c.labels or {}
project = labels.get("com.docker.compose.project")
service = labels.get("com.docker.compose.service")
# working dir & config files are present in newer compose versions; use if available
proj_workdir = labels.get("com.docker.compose.project.working_dir")
proj_config_files = labels.get("com.docker.compose.project.config_files") # comma-separated
compose_dir = Path(proj_workdir).resolve() if proj_workdir else fallback_dir
# best-effort image ref for lookup
image_ref = (c.image.tags[0] if c.image.tags else (c.image.attrs.get("RepoDigests", ["<untagged>"])[0]))
# compute local digest (None for locally-built/unpinned)
local_d = local_repo_digest(client, image_ref)
if local_d is None:
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} no local digest (locally built/unpinned)")
continue
# compute remote digest from registry manifest (no pull)
reg, repo, tag = normalize_image(image_ref)
try:
remote_d = remote_digest(reg, repo, tag)
except requests.HTTPError as he:
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} registry error: {he}")
continue
except requests.RequestException as rexc:
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} network error: {rexc}")
continue
if not remote_d:
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} could not obtain remote digest")
continue
if local_d == remote_d:
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UP-TO-DATE':18}")
else:
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UPDATE AVAILABLE':18} local={local_d[:18]}… remote={remote_d[:18]}")
# Should we update this one?
listed = (
(c.name in allow) or
(service in allow if service else False)
)
if listed and service and project:
updates_to_apply[c.name] = {
"service": service,
"project": project,
"compose_dir": compose_dir,
"config_files": proj_config_files.split(",") if proj_config_files else None,
}
if not updates_to_apply:
if allow:
print("\nNo listed containers/services require updates (or labels/project info not found).")
return
# Apply updates via docker compose for each affected service
print("\nPlanned updates:")
for name, meta in updates_to_apply.items():
svc = meta["service"]
cdir = meta["compose_dir"] or Path.cwd()
cfgs = meta["config_files"]
# Build -f args; resolve relative files against project dir
cfg_args: list[str] = []
if cfgs:
for f in cfgs:
p = Path(f)
if not p.is_absolute():
p = (cdir / p).resolve()
cfg_args += ["-f", str(p)]
base = ["docker", "compose",
"--project-directory", str(cdir)]
cmd_pull = [*base, *cfg_args, "pull", svc]
cmd_up = [*base, *cfg_args, "up", "-d", "--no-deps", svc]
print(f" - {name}: (service '{svc}')")
if args.dry_run:
print(f" would run: {' '.join(cmd_pull)} (cwd={cdir})")
print(f" would run: {' '.join(cmd_up)} (cwd={cdir})")
continue
res = run_compose(cmd_pull, cdir)
if res.returncode != 0:
print(f" pull failed: {res.stderr.strip() or res.stdout.strip()}")
continue
else:
print(f" pull ok: {res.stdout.strip().splitlines()[-1] if res.stdout else 'done'}")
res2 = run_compose(cmd_up, cdir)
if res2.returncode != 0:
print(f" up failed: {res2.stderr.strip() or res2.stdout.strip()}")
else:
print(f" up ok: {res2.stdout.strip().splitlines()[-1] if res2.stdout else 'recreated'}")
if __name__ == "__main__":
main()