authentik.blueprints.v1.oci
OCI Client
1"""OCI Client""" 2 3from typing import Any 4from urllib.parse import ParseResult, urlparse 5 6from opencontainers.distribution.reggie import ( 7 NewClient, 8 WithDebug, 9 WithDefaultName, 10 WithDigest, 11 WithReference, 12 WithUserAgent, 13 WithUsernamePassword, 14) 15from requests.exceptions import RequestException 16from structlog import get_logger 17from structlog.stdlib import BoundLogger 18 19from authentik.lib.sentry import SentryIgnoredException 20from authentik.lib.utils.http import authentik_user_agent 21 22OCI_MEDIA_TYPE = "application/vnd.goauthentik.blueprint.v1+yaml" 23OCI_PREFIX = "oci://" 24 25 26class OCIException(SentryIgnoredException): 27 """OCI-related errors""" 28 29 30class BlueprintOCIClient: 31 """Blueprint OCI Client""" 32 33 url: ParseResult 34 sanitized_url: str 35 logger: BoundLogger 36 ref: str 37 client: NewClient 38 39 def __init__(self, url: str) -> None: 40 self._parse_url(url) 41 self.logger = get_logger().bind(url=self.sanitized_url) 42 43 self.ref = "latest" 44 # Remove the leading slash of the path to convert it to an image name 45 path = self.url.path[1:] 46 if ":" in path: 47 # if there's a colon in the path, use everything after it as a ref 48 path, _, self.ref = path.partition(":") 49 base_url = f"https://{self.url.hostname}" 50 if self.url.port: 51 base_url += f":{self.url.port}" 52 self.client = NewClient( 53 base_url, 54 WithUserAgent(authentik_user_agent()), 55 WithUsernamePassword(self.url.username, self.url.password), 56 WithDefaultName(path), 57 WithDebug(True), 58 ) 59 60 def _parse_url(self, url: str): 61 self.url = urlparse(url) 62 netloc = self.url.netloc 63 if "@" in netloc: 64 netloc = netloc[netloc.index("@") + 1 :] 65 self.sanitized_url = self.url._replace(netloc=netloc).geturl() 66 67 def fetch_manifests(self) -> dict[str, Any]: 68 """Fetch manifests for ref""" 69 self.logger.info("Fetching OCI manifests for blueprint") 70 manifest_request = self.client.NewRequest( 71 "GET", 72 "/v2/<name>/manifests/<reference>", 73 WithReference(self.ref), 74 ).SetHeader("Accept", "application/vnd.oci.image.manifest.v1+json") 75 try: 76 manifest_response = self.client.Do(manifest_request) 77 manifest_response.raise_for_status() 78 except RequestException as exc: 79 raise OCIException(exc) from exc 80 manifest = manifest_response.json() 81 if "errors" in manifest: 82 raise OCIException(manifest["errors"]) 83 return manifest 84 85 def fetch_blobs(self, manifest: dict[str, Any]): 86 """Fetch blob based on manifest info""" 87 blob = None 88 for layer in manifest.get("layers", []): 89 if layer.get("mediaType", "") == OCI_MEDIA_TYPE: 90 blob = layer.get("digest") 91 self.logger.debug("Found layer with matching media type", blob=blob) 92 if not blob: 93 raise OCIException("Blob not found") 94 95 blob_request = self.client.NewRequest( 96 "GET", 97 "/v2/<name>/blobs/<digest>", 98 WithDigest(blob), 99 ) 100 try: 101 blob_response = self.client.Do(blob_request) 102 blob_response.raise_for_status() 103 return blob_response.text 104 except RequestException as exc: 105 raise OCIException(exc) from exc
OCI_MEDIA_TYPE =
'application/vnd.goauthentik.blueprint.v1+yaml'
OCI_PREFIX =
'oci://'
OCI-related errors
class
BlueprintOCIClient:
31class BlueprintOCIClient: 32 """Blueprint OCI Client""" 33 34 url: ParseResult 35 sanitized_url: str 36 logger: BoundLogger 37 ref: str 38 client: NewClient 39 40 def __init__(self, url: str) -> None: 41 self._parse_url(url) 42 self.logger = get_logger().bind(url=self.sanitized_url) 43 44 self.ref = "latest" 45 # Remove the leading slash of the path to convert it to an image name 46 path = self.url.path[1:] 47 if ":" in path: 48 # if there's a colon in the path, use everything after it as a ref 49 path, _, self.ref = path.partition(":") 50 base_url = f"https://{self.url.hostname}" 51 if self.url.port: 52 base_url += f":{self.url.port}" 53 self.client = NewClient( 54 base_url, 55 WithUserAgent(authentik_user_agent()), 56 WithUsernamePassword(self.url.username, self.url.password), 57 WithDefaultName(path), 58 WithDebug(True), 59 ) 60 61 def _parse_url(self, url: str): 62 self.url = urlparse(url) 63 netloc = self.url.netloc 64 if "@" in netloc: 65 netloc = netloc[netloc.index("@") + 1 :] 66 self.sanitized_url = self.url._replace(netloc=netloc).geturl() 67 68 def fetch_manifests(self) -> dict[str, Any]: 69 """Fetch manifests for ref""" 70 self.logger.info("Fetching OCI manifests for blueprint") 71 manifest_request = self.client.NewRequest( 72 "GET", 73 "/v2/<name>/manifests/<reference>", 74 WithReference(self.ref), 75 ).SetHeader("Accept", "application/vnd.oci.image.manifest.v1+json") 76 try: 77 manifest_response = self.client.Do(manifest_request) 78 manifest_response.raise_for_status() 79 except RequestException as exc: 80 raise OCIException(exc) from exc 81 manifest = manifest_response.json() 82 if "errors" in manifest: 83 raise OCIException(manifest["errors"]) 84 return manifest 85 86 def fetch_blobs(self, manifest: dict[str, Any]): 87 """Fetch blob based on manifest info""" 88 blob = None 89 for layer in manifest.get("layers", []): 90 if layer.get("mediaType", "") == OCI_MEDIA_TYPE: 91 blob = layer.get("digest") 92 self.logger.debug("Found layer with matching media type", blob=blob) 93 if not blob: 94 raise OCIException("Blob not found") 95 96 blob_request = self.client.NewRequest( 97 "GET", 98 "/v2/<name>/blobs/<digest>", 99 WithDigest(blob), 100 ) 101 try: 102 blob_response = self.client.Do(blob_request) 103 blob_response.raise_for_status() 104 return blob_response.text 105 except RequestException as exc: 106 raise OCIException(exc) from exc
Blueprint OCI Client
BlueprintOCIClient(url: str)
40 def __init__(self, url: str) -> None: 41 self._parse_url(url) 42 self.logger = get_logger().bind(url=self.sanitized_url) 43 44 self.ref = "latest" 45 # Remove the leading slash of the path to convert it to an image name 46 path = self.url.path[1:] 47 if ":" in path: 48 # if there's a colon in the path, use everything after it as a ref 49 path, _, self.ref = path.partition(":") 50 base_url = f"https://{self.url.hostname}" 51 if self.url.port: 52 base_url += f":{self.url.port}" 53 self.client = NewClient( 54 base_url, 55 WithUserAgent(authentik_user_agent()), 56 WithUsernamePassword(self.url.username, self.url.password), 57 WithDefaultName(path), 58 WithDebug(True), 59 )
def
fetch_manifests(self) -> dict[str, typing.Any]:
68 def fetch_manifests(self) -> dict[str, Any]: 69 """Fetch manifests for ref""" 70 self.logger.info("Fetching OCI manifests for blueprint") 71 manifest_request = self.client.NewRequest( 72 "GET", 73 "/v2/<name>/manifests/<reference>", 74 WithReference(self.ref), 75 ).SetHeader("Accept", "application/vnd.oci.image.manifest.v1+json") 76 try: 77 manifest_response = self.client.Do(manifest_request) 78 manifest_response.raise_for_status() 79 except RequestException as exc: 80 raise OCIException(exc) from exc 81 manifest = manifest_response.json() 82 if "errors" in manifest: 83 raise OCIException(manifest["errors"]) 84 return manifest
Fetch manifests for ref
def
fetch_blobs(self, manifest: dict[str, typing.Any]):
86 def fetch_blobs(self, manifest: dict[str, Any]): 87 """Fetch blob based on manifest info""" 88 blob = None 89 for layer in manifest.get("layers", []): 90 if layer.get("mediaType", "") == OCI_MEDIA_TYPE: 91 blob = layer.get("digest") 92 self.logger.debug("Found layer with matching media type", blob=blob) 93 if not blob: 94 raise OCIException("Blob not found") 95 96 blob_request = self.client.NewRequest( 97 "GET", 98 "/v2/<name>/blobs/<digest>", 99 WithDigest(blob), 100 ) 101 try: 102 blob_response = self.client.Do(blob_request) 103 blob_response.raise_for_status() 104 return blob_response.text 105 except RequestException as exc: 106 raise OCIException(exc) from exc
Fetch blob based on manifest info