Added script to update docker compose containers
This commit is contained in:
330
update_docker.py
Executable file
330
update_docker.py
Executable file
@@ -0,0 +1,330 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
compose_check_updates.py
|
||||
|
||||
Checks running Docker Compose containers for tag updates WITHOUT pulling.
|
||||
Optionally updates & restarts services listed in a file named 'images'.
|
||||
|
||||
- Update list: file with one name per line (container name like 'proj-web-1' OR service name like 'web').
|
||||
- Update action (if listed and update exists): `docker compose pull <service>` then `docker compose up -d <service>`
|
||||
executed in the Compose project working directory (derived from labels when possible).
|
||||
|
||||
Install:
|
||||
pip install docker requests
|
||||
|
||||
Usage:
|
||||
# Just check
|
||||
./compose_check_updates.py
|
||||
|
||||
# Check + update services/containers listed in ./images
|
||||
./compose_check_updates.py --images-file images
|
||||
|
||||
# If working directory can’t be auto-detected from labels, specify fallback:
|
||||
./compose_check_updates.py --images-file images --project-dir /path/to/compose
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import subprocess
|
||||
import hashlib
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Optional, Tuple, Dict, Set
|
||||
|
||||
import requests
|
||||
import docker
|
||||
|
||||
# --- HTTP/registry setup (for "check without pull") ---
|
||||
|
||||
DEFAULT_REGISTRY = "registry-1.docker.io"
|
||||
DEFAULT_NAMESPACE = "library"
|
||||
SESSION = requests.Session()
|
||||
SESSION.headers.update({"User-Agent": "compose-update-checker/1.0", "Accept": "*/*"})
|
||||
|
||||
MANIFEST_ACCEPT = (
|
||||
"application/vnd.docker.distribution.manifest.v2+json,"
|
||||
"application/vnd.oci.image.manifest.v1+json,"
|
||||
"application/vnd.docker.distribution.manifest.list.v2+json,"
|
||||
"application/vnd.oci.image.index.v1+json"
|
||||
)
|
||||
|
||||
|
||||
# ---------- helpers: image reference parsing ----------
|
||||
|
||||
def normalize_image(ref: str) -> Tuple[str, str, str]:
|
||||
"""
|
||||
Normalize an image reference into (registry, repository, tag_or_digest).
|
||||
- Supports 'nginx', 'nginx:1.25', 'docker.io/library/nginx:latest',
|
||||
'ghcr.io/owner/repo:tag', 'registry:5000/ns/repo@sha256:...'
|
||||
- Defaults to :latest if no tag/digest given.
|
||||
"""
|
||||
if "@" in ref:
|
||||
name, digest = ref.split("@", 1)
|
||||
tag = "@" + digest
|
||||
else:
|
||||
if ":" in ref and "/" in ref.split(":")[0]:
|
||||
# host[:port]/repo[:tag] OR host[:port]/repo
|
||||
name, *maybe_tag = ref.rsplit(":", 1)
|
||||
if maybe_tag and "/" not in maybe_tag[0]:
|
||||
tag = maybe_tag[0]
|
||||
else:
|
||||
name = ref
|
||||
tag = "latest"
|
||||
elif ":" in ref:
|
||||
name, tag = ref.rsplit(":", 1)
|
||||
else:
|
||||
name = ref
|
||||
tag = "latest"
|
||||
|
||||
parts = name.split("/")
|
||||
if len(parts) == 1:
|
||||
registry = DEFAULT_REGISTRY
|
||||
repository = f"{DEFAULT_NAMESPACE}/{parts[0]}"
|
||||
else:
|
||||
if "." in parts[0] or ":" in parts[0] or parts[0] == "localhost":
|
||||
registry = parts[0]
|
||||
repository = "/".join(parts[1:])
|
||||
else:
|
||||
registry = DEFAULT_REGISTRY
|
||||
repository = "/".join(parts)
|
||||
|
||||
if registry in ("docker.io", "index.docker.io"):
|
||||
registry = DEFAULT_REGISTRY
|
||||
|
||||
return registry, repository, tag
|
||||
|
||||
|
||||
def bearer_token(auth_header: str) -> Optional[str]:
|
||||
m = re.match(
|
||||
r'Bearer realm="(?P<realm>[^"]+)"(?:,service="(?P<service>[^"]+)")?(?:,scope="(?P<scope>[^"]+)")?',
|
||||
auth_header,
|
||||
)
|
||||
if not m:
|
||||
return None
|
||||
realm = m.group("realm")
|
||||
q = {}
|
||||
if m.group("service"):
|
||||
q["service"] = m.group("service")
|
||||
if m.group("scope"):
|
||||
q["scope"] = m.group("scope")
|
||||
r = SESSION.get(realm, params=q, timeout=20)
|
||||
r.raise_for_status()
|
||||
return r.json().get("token")
|
||||
|
||||
|
||||
def remote_digest(registry: str, repository: str, tag_or_digest: str) -> Optional[str]:
|
||||
"""
|
||||
Return the remote digest for repo:tag WITHOUT pulling.
|
||||
Handles single-arch manifests and multi-arch manifest lists (OCI index).
|
||||
Strategy:
|
||||
- HEAD first (lightweight, usually returns Docker-Content-Digest)
|
||||
- If 401, fetch Bearer token then retry
|
||||
- If still no header, GET and use header if present
|
||||
- As a last resort, sha256 of the exact response body bytes
|
||||
"""
|
||||
if tag_or_digest.startswith("@sha256:"):
|
||||
return tag_or_digest[1:]
|
||||
|
||||
url = f"https://{registry}/v2/{repository}/manifests/{tag_or_digest}"
|
||||
headers = {"Accept": MANIFEST_ACCEPT}
|
||||
|
||||
def do(req_method):
|
||||
r = req_method(url, headers=headers, timeout=20)
|
||||
if r.status_code == 401 and "WWW-Authenticate" in r.headers:
|
||||
token = bearer_token(r.headers["WWW-Authenticate"])
|
||||
if token:
|
||||
headers["Authorization"] = f"Bearer {token}"
|
||||
r = req_method(url, headers=headers, timeout=20)
|
||||
return r
|
||||
|
||||
# Try HEAD
|
||||
r = do(SESSION.head)
|
||||
if r.status_code == 404:
|
||||
return None
|
||||
r.raise_for_status()
|
||||
digest = r.headers.get("Docker-Content-Digest")
|
||||
if digest:
|
||||
return digest
|
||||
|
||||
# Fall back to GET
|
||||
r = do(SESSION.get)
|
||||
if r.status_code == 404:
|
||||
return None
|
||||
r.raise_for_status()
|
||||
|
||||
digest = r.headers.get("Docker-Content-Digest")
|
||||
if digest:
|
||||
return digest
|
||||
|
||||
# Last resort: compute sha256 of the exact payload (works for v2/OCI content)
|
||||
# Note: Some proxies might reserialize; this still provides a consistent signal for comparison runs.
|
||||
body = r.content or b""
|
||||
if body:
|
||||
return "sha256:" + hashlib.sha256(body).hexdigest()
|
||||
|
||||
return None
|
||||
|
||||
# ---------- local docker helpers ----------
|
||||
|
||||
def local_repo_digest(client, image_ref: str) -> Optional[str]:
|
||||
"""
|
||||
Try to find a local RepoDigest for the given image tag/ref.
|
||||
Returns 'sha256:...' or None.
|
||||
"""
|
||||
try:
|
||||
img = client.images.get(image_ref)
|
||||
except docker.errors.ImageNotFound:
|
||||
return None
|
||||
|
||||
repo_digests = img.attrs.get("RepoDigests") or []
|
||||
for d in repo_digests:
|
||||
if "@sha256:" in d:
|
||||
return d.split("@", 1)[1]
|
||||
return None
|
||||
|
||||
|
||||
def read_allowlist(path: Optional[str]) -> Set[str]:
|
||||
if not path:
|
||||
return set()
|
||||
p = Path(path)
|
||||
if not p.exists():
|
||||
print(f"WARNING: images file '{path}' not found; continuing in check-only mode.", file=sys.stderr)
|
||||
return set()
|
||||
out = set()
|
||||
for line in p.read_text().splitlines():
|
||||
name = line.strip()
|
||||
if name and not name.startswith("#"):
|
||||
out.add(name)
|
||||
return out
|
||||
|
||||
|
||||
def run_compose(cmd: list[str], cwd: Optional[Path]) -> subprocess.CompletedProcess:
|
||||
return subprocess.run(cmd, cwd=str(cwd) if cwd else None, text=True, capture_output=True)
|
||||
|
||||
def main():
|
||||
ap = argparse.ArgumentParser(description="Check Compose containers for image updates without pulling.")
|
||||
ap.add_argument("--images-file", default=None, help="Path to file listing container or service names to update.")
|
||||
ap.add_argument("--project-dir", default=None, help="Fallback compose project dir if label not available.")
|
||||
ap.add_argument("--dry-run", action="store_true", help="Show update actions but do not change anything.")
|
||||
args = ap.parse_args()
|
||||
|
||||
allow = read_allowlist(args.images_file)
|
||||
fallback_dir = Path(args.project_dir).resolve() if args.project_dir else None
|
||||
|
||||
try:
|
||||
client = docker.from_env()
|
||||
except Exception as e:
|
||||
print(f"Failed to connect to Docker daemon: {e}", file=sys.stderr)
|
||||
sys.exit(2)
|
||||
|
||||
containers = client.containers.list() # running only
|
||||
if not containers:
|
||||
print("No running containers.")
|
||||
return
|
||||
|
||||
print(f"{'CONTAINER':28} {'SERVICE':16} {'IMAGE':46} {'STATUS':18} DETAILS")
|
||||
|
||||
# We’ll group info and remember compose project metadata per container
|
||||
updates_to_apply: Dict[str, dict] = {}
|
||||
|
||||
for c in containers:
|
||||
labels = c.labels or {}
|
||||
project = labels.get("com.docker.compose.project")
|
||||
service = labels.get("com.docker.compose.service")
|
||||
# working dir & config files are present in newer compose versions; use if available
|
||||
proj_workdir = labels.get("com.docker.compose.project.working_dir")
|
||||
proj_config_files = labels.get("com.docker.compose.project.config_files") # comma-separated
|
||||
compose_dir = Path(proj_workdir).resolve() if proj_workdir else fallback_dir
|
||||
|
||||
# best-effort image ref for lookup
|
||||
image_ref = (c.image.tags[0] if c.image.tags else (c.image.attrs.get("RepoDigests", ["<untagged>"])[0]))
|
||||
|
||||
# compute local digest (None for locally-built/unpinned)
|
||||
local_d = local_repo_digest(client, image_ref)
|
||||
if local_d is None:
|
||||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} no local digest (locally built/unpinned)")
|
||||
continue
|
||||
|
||||
# compute remote digest from registry manifest (no pull)
|
||||
reg, repo, tag = normalize_image(image_ref)
|
||||
try:
|
||||
remote_d = remote_digest(reg, repo, tag)
|
||||
except requests.HTTPError as he:
|
||||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} registry error: {he}")
|
||||
continue
|
||||
except requests.RequestException as rexc:
|
||||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} network error: {rexc}")
|
||||
continue
|
||||
|
||||
if not remote_d:
|
||||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UNKNOWN':18} could not obtain remote digest")
|
||||
continue
|
||||
|
||||
if local_d == remote_d:
|
||||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UP-TO-DATE':18}")
|
||||
else:
|
||||
print(f"{c.name:28} {str(service or ''):16} {image_ref:46} {'UPDATE AVAILABLE':18} local={local_d[:18]}… remote={remote_d[:18]}…")
|
||||
|
||||
# Should we update this one?
|
||||
listed = (
|
||||
(c.name in allow) or
|
||||
(service in allow if service else False)
|
||||
)
|
||||
if listed and service and project:
|
||||
updates_to_apply[c.name] = {
|
||||
"service": service,
|
||||
"project": project,
|
||||
"compose_dir": compose_dir,
|
||||
"config_files": proj_config_files.split(",") if proj_config_files else None,
|
||||
}
|
||||
|
||||
if not updates_to_apply:
|
||||
if allow:
|
||||
print("\nNo listed containers/services require updates (or labels/project info not found).")
|
||||
return
|
||||
|
||||
# Apply updates via docker compose for each affected service
|
||||
print("\nPlanned updates:")
|
||||
for name, meta in updates_to_apply.items():
|
||||
svc = meta["service"]
|
||||
cdir = meta["compose_dir"] or Path.cwd()
|
||||
cfgs = meta["config_files"]
|
||||
|
||||
# Build -f args; resolve relative files against project dir
|
||||
cfg_args: list[str] = []
|
||||
if cfgs:
|
||||
for f in cfgs:
|
||||
p = Path(f)
|
||||
if not p.is_absolute():
|
||||
p = (cdir / p).resolve()
|
||||
cfg_args += ["-f", str(p)]
|
||||
|
||||
base = ["docker", "compose",
|
||||
"--project-directory", str(cdir)]
|
||||
cmd_pull = [*base, *cfg_args, "pull", svc]
|
||||
cmd_up = [*base, *cfg_args, "up", "-d", "--no-deps", svc]
|
||||
|
||||
print(f" - {name}: (service '{svc}')")
|
||||
if args.dry_run:
|
||||
print(f" would run: {' '.join(cmd_pull)} (cwd={cdir})")
|
||||
print(f" would run: {' '.join(cmd_up)} (cwd={cdir})")
|
||||
continue
|
||||
|
||||
res = run_compose(cmd_pull, cdir)
|
||||
if res.returncode != 0:
|
||||
print(f" pull failed: {res.stderr.strip() or res.stdout.strip()}")
|
||||
continue
|
||||
else:
|
||||
print(f" pull ok: {res.stdout.strip().splitlines()[-1] if res.stdout else 'done'}")
|
||||
|
||||
res2 = run_compose(cmd_up, cdir)
|
||||
if res2.returncode != 0:
|
||||
print(f" up failed: {res2.stderr.strip() or res2.stdout.strip()}")
|
||||
else:
|
||||
print(f" up ok: {res2.stdout.strip().splitlines()[-1] if res2.stdout else 'recreated'}")
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
Reference in New Issue
Block a user