authentik.blueprints.v1.oci

OCI Client

  1"""OCI Client"""
  2
  3from typing import Any
  4from urllib.parse import ParseResult, urlparse
  5
  6from opencontainers.distribution.reggie import (
  7    NewClient,
  8    WithDebug,
  9    WithDefaultName,
 10    WithDigest,
 11    WithReference,
 12    WithUserAgent,
 13    WithUsernamePassword,
 14)
 15from requests.exceptions import RequestException
 16from structlog import get_logger
 17from structlog.stdlib import BoundLogger
 18
 19from authentik.lib.sentry import SentryIgnoredException
 20from authentik.lib.utils.http import authentik_user_agent
 21
 22OCI_MEDIA_TYPE = "application/vnd.goauthentik.blueprint.v1+yaml"
 23OCI_PREFIX = "oci://"
 24
 25
 26class OCIException(SentryIgnoredException):
 27    """OCI-related errors"""
 28
 29
 30class BlueprintOCIClient:
 31    """Blueprint OCI Client"""
 32
 33    url: ParseResult
 34    sanitized_url: str
 35    logger: BoundLogger
 36    ref: str
 37    client: NewClient
 38
 39    def __init__(self, url: str) -> None:
 40        self._parse_url(url)
 41        self.logger = get_logger().bind(url=self.sanitized_url)
 42
 43        self.ref = "latest"
 44        # Remove the leading slash of the path to convert it to an image name
 45        path = self.url.path[1:]
 46        if ":" in path:
 47            # if there's a colon in the path, use everything after it as a ref
 48            path, _, self.ref = path.partition(":")
 49        base_url = f"https://{self.url.hostname}"
 50        if self.url.port:
 51            base_url += f":{self.url.port}"
 52        self.client = NewClient(
 53            base_url,
 54            WithUserAgent(authentik_user_agent()),
 55            WithUsernamePassword(self.url.username, self.url.password),
 56            WithDefaultName(path),
 57            WithDebug(True),
 58        )
 59
 60    def _parse_url(self, url: str):
 61        self.url = urlparse(url)
 62        netloc = self.url.netloc
 63        if "@" in netloc:
 64            netloc = netloc[netloc.index("@") + 1 :]
 65        self.sanitized_url = self.url._replace(netloc=netloc).geturl()
 66
 67    def fetch_manifests(self) -> dict[str, Any]:
 68        """Fetch manifests for ref"""
 69        self.logger.info("Fetching OCI manifests for blueprint")
 70        manifest_request = self.client.NewRequest(
 71            "GET",
 72            "/v2/<name>/manifests/<reference>",
 73            WithReference(self.ref),
 74        ).SetHeader("Accept", "application/vnd.oci.image.manifest.v1+json")
 75        try:
 76            manifest_response = self.client.Do(manifest_request)
 77            manifest_response.raise_for_status()
 78        except RequestException as exc:
 79            raise OCIException(exc) from exc
 80        manifest = manifest_response.json()
 81        if "errors" in manifest:
 82            raise OCIException(manifest["errors"])
 83        return manifest
 84
 85    def fetch_blobs(self, manifest: dict[str, Any]):
 86        """Fetch blob based on manifest info"""
 87        blob = None
 88        for layer in manifest.get("layers", []):
 89            if layer.get("mediaType", "") == OCI_MEDIA_TYPE:
 90                blob = layer.get("digest")
 91                self.logger.debug("Found layer with matching media type", blob=blob)
 92        if not blob:
 93            raise OCIException("Blob not found")
 94
 95        blob_request = self.client.NewRequest(
 96            "GET",
 97            "/v2/<name>/blobs/<digest>",
 98            WithDigest(blob),
 99        )
100        try:
101            blob_response = self.client.Do(blob_request)
102            blob_response.raise_for_status()
103            return blob_response.text
104        except RequestException as exc:
105            raise OCIException(exc) from exc
OCI_MEDIA_TYPE = 'application/vnd.goauthentik.blueprint.v1+yaml'
OCI_PREFIX = 'oci://'
class OCIException(authentik.lib.sentry.SentryIgnoredException):
27class OCIException(SentryIgnoredException):
28    """OCI-related errors"""

OCI-related errors

class BlueprintOCIClient:
 31class BlueprintOCIClient:
 32    """Blueprint OCI Client"""
 33
 34    url: ParseResult
 35    sanitized_url: str
 36    logger: BoundLogger
 37    ref: str
 38    client: NewClient
 39
 40    def __init__(self, url: str) -> None:
 41        self._parse_url(url)
 42        self.logger = get_logger().bind(url=self.sanitized_url)
 43
 44        self.ref = "latest"
 45        # Remove the leading slash of the path to convert it to an image name
 46        path = self.url.path[1:]
 47        if ":" in path:
 48            # if there's a colon in the path, use everything after it as a ref
 49            path, _, self.ref = path.partition(":")
 50        base_url = f"https://{self.url.hostname}"
 51        if self.url.port:
 52            base_url += f":{self.url.port}"
 53        self.client = NewClient(
 54            base_url,
 55            WithUserAgent(authentik_user_agent()),
 56            WithUsernamePassword(self.url.username, self.url.password),
 57            WithDefaultName(path),
 58            WithDebug(True),
 59        )
 60
 61    def _parse_url(self, url: str):
 62        self.url = urlparse(url)
 63        netloc = self.url.netloc
 64        if "@" in netloc:
 65            netloc = netloc[netloc.index("@") + 1 :]
 66        self.sanitized_url = self.url._replace(netloc=netloc).geturl()
 67
 68    def fetch_manifests(self) -> dict[str, Any]:
 69        """Fetch manifests for ref"""
 70        self.logger.info("Fetching OCI manifests for blueprint")
 71        manifest_request = self.client.NewRequest(
 72            "GET",
 73            "/v2/<name>/manifests/<reference>",
 74            WithReference(self.ref),
 75        ).SetHeader("Accept", "application/vnd.oci.image.manifest.v1+json")
 76        try:
 77            manifest_response = self.client.Do(manifest_request)
 78            manifest_response.raise_for_status()
 79        except RequestException as exc:
 80            raise OCIException(exc) from exc
 81        manifest = manifest_response.json()
 82        if "errors" in manifest:
 83            raise OCIException(manifest["errors"])
 84        return manifest
 85
 86    def fetch_blobs(self, manifest: dict[str, Any]):
 87        """Fetch blob based on manifest info"""
 88        blob = None
 89        for layer in manifest.get("layers", []):
 90            if layer.get("mediaType", "") == OCI_MEDIA_TYPE:
 91                blob = layer.get("digest")
 92                self.logger.debug("Found layer with matching media type", blob=blob)
 93        if not blob:
 94            raise OCIException("Blob not found")
 95
 96        blob_request = self.client.NewRequest(
 97            "GET",
 98            "/v2/<name>/blobs/<digest>",
 99            WithDigest(blob),
100        )
101        try:
102            blob_response = self.client.Do(blob_request)
103            blob_response.raise_for_status()
104            return blob_response.text
105        except RequestException as exc:
106            raise OCIException(exc) from exc

Blueprint OCI Client

BlueprintOCIClient(url: str)
40    def __init__(self, url: str) -> None:
41        self._parse_url(url)
42        self.logger = get_logger().bind(url=self.sanitized_url)
43
44        self.ref = "latest"
45        # Remove the leading slash of the path to convert it to an image name
46        path = self.url.path[1:]
47        if ":" in path:
48            # if there's a colon in the path, use everything after it as a ref
49            path, _, self.ref = path.partition(":")
50        base_url = f"https://{self.url.hostname}"
51        if self.url.port:
52            base_url += f":{self.url.port}"
53        self.client = NewClient(
54            base_url,
55            WithUserAgent(authentik_user_agent()),
56            WithUsernamePassword(self.url.username, self.url.password),
57            WithDefaultName(path),
58            WithDebug(True),
59        )
url: urllib.parse.ParseResult
sanitized_url: str
logger: structlog.stdlib.BoundLogger
ref: str
client: opencontainers.distribution.reggie.client.NewClient
def fetch_manifests(self) -> dict[str, typing.Any]:
68    def fetch_manifests(self) -> dict[str, Any]:
69        """Fetch manifests for ref"""
70        self.logger.info("Fetching OCI manifests for blueprint")
71        manifest_request = self.client.NewRequest(
72            "GET",
73            "/v2/<name>/manifests/<reference>",
74            WithReference(self.ref),
75        ).SetHeader("Accept", "application/vnd.oci.image.manifest.v1+json")
76        try:
77            manifest_response = self.client.Do(manifest_request)
78            manifest_response.raise_for_status()
79        except RequestException as exc:
80            raise OCIException(exc) from exc
81        manifest = manifest_response.json()
82        if "errors" in manifest:
83            raise OCIException(manifest["errors"])
84        return manifest

Fetch manifests for ref

def fetch_blobs(self, manifest: dict[str, typing.Any]):
 86    def fetch_blobs(self, manifest: dict[str, Any]):
 87        """Fetch blob based on manifest info"""
 88        blob = None
 89        for layer in manifest.get("layers", []):
 90            if layer.get("mediaType", "") == OCI_MEDIA_TYPE:
 91                blob = layer.get("digest")
 92                self.logger.debug("Found layer with matching media type", blob=blob)
 93        if not blob:
 94            raise OCIException("Blob not found")
 95
 96        blob_request = self.client.NewRequest(
 97            "GET",
 98            "/v2/<name>/blobs/<digest>",
 99            WithDigest(blob),
100        )
101        try:
102            blob_response = self.client.Do(blob_request)
103            blob_response.raise_for_status()
104            return blob_response.text
105        except RequestException as exc:
106            raise OCIException(exc) from exc

Fetch blob based on manifest info