authentik.outposts.controllers.k8s.base
Base Kubernetes Reconciler
1"""Base Kubernetes Reconciler""" 2 3import re 4from dataclasses import asdict 5from json import dumps 6from typing import TYPE_CHECKING, TypeVar 7 8from dacite.core import from_dict 9from django.http import HttpResponseNotFound 10from django.utils.text import slugify 11from jsonpatch import JsonPatchConflict, JsonPatchException, JsonPatchTestFailed, apply_patch 12from kubernetes.client import ApiClient, V1ObjectMeta 13from kubernetes.client.exceptions import ApiException, OpenApiException 14from kubernetes.client.models.v1_deployment import V1Deployment 15from kubernetes.client.models.v1_pod import V1Pod 16from requests import Response 17from structlog.stdlib import get_logger 18from urllib3.exceptions import HTTPError 19 20from authentik import authentik_version 21from authentik.outposts.apps import MANAGED_OUTPOST 22from authentik.outposts.controllers.base import ControllerException 23from authentik.outposts.controllers.k8s.triggers import NeedsRecreate, NeedsUpdate 24 25if TYPE_CHECKING: 26 from authentik.outposts.controllers.kubernetes import KubernetesController 27 28T = TypeVar("T", V1Pod, V1Deployment) 29 30 31def get_version() -> str: 32 """Wrapper for authentik_version() to make testing easier""" 33 return authentik_version() 34 35 36class KubernetesObjectReconciler[T]: 37 """Base Kubernetes Reconciler, handles the basic logic.""" 38 39 controller: KubernetesController 40 41 def __init__(self, controller: KubernetesController): 42 self.controller = controller 43 self.namespace = controller.outpost.config.kubernetes_namespace 44 self.logger = get_logger().bind(type=self.__class__.__name__) 45 46 def get_patch(self): 47 """Get any patches that apply to this CRD""" 48 patches = self.controller.outpost.config.kubernetes_json_patches 49 if not patches: 50 return None 51 return patches.get(self.reconciler_name(), None) 52 53 @property 54 def is_embedded(self) -> bool: 55 """Return true if the current outpost is embedded""" 56 return self.controller.outpost.managed == MANAGED_OUTPOST 57 58 @staticmethod 59 def reconciler_name() -> str: 60 """A name this reconciler is identified by in the configuration""" 61 raise NotImplementedError 62 63 @property 64 def noop(self) -> bool: 65 """Return true if this object should not be created/updated/deleted in this cluster""" 66 return False 67 68 @property 69 def name(self) -> str: 70 """Get the name of the object this reconciler manages""" 71 72 base_name = ( 73 self.controller.outpost.config.object_naming_template 74 % { 75 "name": slugify(self.controller.outpost.name), 76 "uuid": self.controller.outpost.uuid.hex, 77 } 78 ).lower() 79 80 formatted = slugify(base_name) 81 formatted = re.sub(r"[^a-z0-9-]", "-", formatted) 82 formatted = re.sub(r"-+", "-", formatted) 83 formatted = formatted[:63] 84 85 if not formatted: 86 formatted = f"outpost-{self.controller.outpost.uuid.hex}"[:63] 87 88 return formatted 89 90 def get_patched_reference_object(self) -> T: 91 """Get patched reference object""" 92 reference = self.get_reference_object() 93 patch = self.get_patch() 94 try: 95 json = ApiClient().sanitize_for_serialization(reference) 96 # Custom objects will not be known to the clients openapi types 97 except AttributeError: 98 json = asdict(reference) 99 try: 100 ref = json 101 if patch is not None: 102 ref = apply_patch(json, patch) 103 except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc: 104 raise ControllerException(f"JSON Patch failed: {exc}") from exc 105 mock_response = Response() 106 mock_response.data = dumps(ref) 107 108 try: 109 result = ApiClient().deserialize(mock_response, reference.__class__.__name__) 110 # Custom objects will not be known to the clients openapi types 111 except AttributeError: 112 result = from_dict(reference.__class__, data=ref) 113 114 return result 115 116 def up(self): 117 """Create object if it doesn't exist, update if needed or recreate if needed.""" 118 current = None 119 if self.noop: 120 self.logger.debug("Object is noop") 121 return 122 reference = self.get_patched_reference_object() 123 try: 124 try: 125 current = self.retrieve() 126 except (OpenApiException, HTTPError) as exc: 127 if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code: 128 self.logger.debug("Failed to get current, triggering recreate") 129 raise NeedsRecreate from exc 130 self.logger.debug("Other unhandled error", exc=exc) 131 raise exc 132 self.reconcile(current, reference) 133 except NeedsUpdate: 134 try: 135 self.update(current, reference) 136 self.logger.debug("Updating") 137 except (OpenApiException, HTTPError) as exc: 138 if isinstance(exc, ApiException) and exc.status == 422: # noqa: PLR2004 139 self.logger.debug("Failed to update current, triggering re-create") 140 self._recreate(current=current, reference=reference) 141 return 142 self.logger.debug("Other unhandled error", exc=exc) 143 raise exc 144 except NeedsRecreate: 145 self._recreate(current=current, reference=reference) 146 else: 147 self.logger.debug("Object is up-to-date.") 148 149 def _recreate(self, reference: T, current: T | None = None): 150 """Recreate object""" 151 self.logger.debug("Recreate requested") 152 if current: 153 self.logger.debug("Deleted old") 154 self.delete(current) 155 else: 156 self.logger.debug("No old found, creating") 157 self.logger.debug("Creating") 158 self.create(reference) 159 160 def down(self): 161 """Delete object if found""" 162 if self.noop: 163 self.logger.debug("Object is noop") 164 return 165 try: 166 current = self.retrieve() 167 self.delete(current) 168 self.logger.debug("Removing") 169 except (OpenApiException, HTTPError) as exc: 170 if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code: 171 self.logger.debug("Failed to get current, assuming non-existent") 172 return 173 self.logger.debug("Other unhandled error", exc=exc) 174 raise exc 175 176 def get_reference_object(self) -> T: 177 """Return object as it should be""" 178 raise NotImplementedError 179 180 def reconcile(self, current: T, reference: T): 181 """Check what operations should be done, should be raised as 182 ReconcileTrigger""" 183 if current.metadata.labels != reference.metadata.labels: 184 raise NeedsUpdate() 185 186 patch = self.get_patch() 187 if patch is not None: 188 try: 189 current_json = ApiClient().sanitize_for_serialization(current) 190 except AttributeError: 191 current_json = asdict(current) 192 try: 193 if apply_patch(current_json, patch) != current_json: 194 raise NeedsUpdate() 195 except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc: 196 raise ControllerException(f"JSON Patch failed: {exc}") from exc 197 198 def create(self, reference: T): 199 """API Wrapper to create object""" 200 raise NotImplementedError 201 202 def retrieve(self) -> T: 203 """API Wrapper to retrieve object""" 204 raise NotImplementedError 205 206 def delete(self, reference: T): 207 """API Wrapper to delete object""" 208 raise NotImplementedError 209 210 def update(self, current: T, reference: T): 211 """API Wrapper to update object""" 212 raise NotImplementedError 213 214 def get_object_meta(self, **kwargs) -> V1ObjectMeta: 215 """Get common object metadata""" 216 return V1ObjectMeta( 217 namespace=self.namespace, 218 labels={ 219 "app.kubernetes.io/instance": slugify(self.controller.outpost.name), 220 "app.kubernetes.io/managed-by": "goauthentik.io", 221 "app.kubernetes.io/name": f"authentik-{self.controller.outpost.type.lower()}", 222 "app.kubernetes.io/version": get_version().replace("+", "-"), 223 "goauthentik.io/outpost-name": slugify(self.controller.outpost.name), 224 "goauthentik.io/outpost-type": str(self.controller.outpost.type), 225 "goauthentik.io/outpost-uuid": self.controller.outpost.uuid.hex, 226 }, 227 **kwargs, 228 )
def
get_version() -> str:
32def get_version() -> str: 33 """Wrapper for authentik_version() to make testing easier""" 34 return authentik_version()
Wrapper for authentik_version() to make testing easier
class
KubernetesObjectReconciler(typing.Generic[T]):
37class KubernetesObjectReconciler[T]: 38 """Base Kubernetes Reconciler, handles the basic logic.""" 39 40 controller: KubernetesController 41 42 def __init__(self, controller: KubernetesController): 43 self.controller = controller 44 self.namespace = controller.outpost.config.kubernetes_namespace 45 self.logger = get_logger().bind(type=self.__class__.__name__) 46 47 def get_patch(self): 48 """Get any patches that apply to this CRD""" 49 patches = self.controller.outpost.config.kubernetes_json_patches 50 if not patches: 51 return None 52 return patches.get(self.reconciler_name(), None) 53 54 @property 55 def is_embedded(self) -> bool: 56 """Return true if the current outpost is embedded""" 57 return self.controller.outpost.managed == MANAGED_OUTPOST 58 59 @staticmethod 60 def reconciler_name() -> str: 61 """A name this reconciler is identified by in the configuration""" 62 raise NotImplementedError 63 64 @property 65 def noop(self) -> bool: 66 """Return true if this object should not be created/updated/deleted in this cluster""" 67 return False 68 69 @property 70 def name(self) -> str: 71 """Get the name of the object this reconciler manages""" 72 73 base_name = ( 74 self.controller.outpost.config.object_naming_template 75 % { 76 "name": slugify(self.controller.outpost.name), 77 "uuid": self.controller.outpost.uuid.hex, 78 } 79 ).lower() 80 81 formatted = slugify(base_name) 82 formatted = re.sub(r"[^a-z0-9-]", "-", formatted) 83 formatted = re.sub(r"-+", "-", formatted) 84 formatted = formatted[:63] 85 86 if not formatted: 87 formatted = f"outpost-{self.controller.outpost.uuid.hex}"[:63] 88 89 return formatted 90 91 def get_patched_reference_object(self) -> T: 92 """Get patched reference object""" 93 reference = self.get_reference_object() 94 patch = self.get_patch() 95 try: 96 json = ApiClient().sanitize_for_serialization(reference) 97 # Custom objects will not be known to the clients openapi types 98 except AttributeError: 99 json = asdict(reference) 100 try: 101 ref = json 102 if patch is not None: 103 ref = apply_patch(json, patch) 104 except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc: 105 raise ControllerException(f"JSON Patch failed: {exc}") from exc 106 mock_response = Response() 107 mock_response.data = dumps(ref) 108 109 try: 110 result = ApiClient().deserialize(mock_response, reference.__class__.__name__) 111 # Custom objects will not be known to the clients openapi types 112 except AttributeError: 113 result = from_dict(reference.__class__, data=ref) 114 115 return result 116 117 def up(self): 118 """Create object if it doesn't exist, update if needed or recreate if needed.""" 119 current = None 120 if self.noop: 121 self.logger.debug("Object is noop") 122 return 123 reference = self.get_patched_reference_object() 124 try: 125 try: 126 current = self.retrieve() 127 except (OpenApiException, HTTPError) as exc: 128 if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code: 129 self.logger.debug("Failed to get current, triggering recreate") 130 raise NeedsRecreate from exc 131 self.logger.debug("Other unhandled error", exc=exc) 132 raise exc 133 self.reconcile(current, reference) 134 except NeedsUpdate: 135 try: 136 self.update(current, reference) 137 self.logger.debug("Updating") 138 except (OpenApiException, HTTPError) as exc: 139 if isinstance(exc, ApiException) and exc.status == 422: # noqa: PLR2004 140 self.logger.debug("Failed to update current, triggering re-create") 141 self._recreate(current=current, reference=reference) 142 return 143 self.logger.debug("Other unhandled error", exc=exc) 144 raise exc 145 except NeedsRecreate: 146 self._recreate(current=current, reference=reference) 147 else: 148 self.logger.debug("Object is up-to-date.") 149 150 def _recreate(self, reference: T, current: T | None = None): 151 """Recreate object""" 152 self.logger.debug("Recreate requested") 153 if current: 154 self.logger.debug("Deleted old") 155 self.delete(current) 156 else: 157 self.logger.debug("No old found, creating") 158 self.logger.debug("Creating") 159 self.create(reference) 160 161 def down(self): 162 """Delete object if found""" 163 if self.noop: 164 self.logger.debug("Object is noop") 165 return 166 try: 167 current = self.retrieve() 168 self.delete(current) 169 self.logger.debug("Removing") 170 except (OpenApiException, HTTPError) as exc: 171 if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code: 172 self.logger.debug("Failed to get current, assuming non-existent") 173 return 174 self.logger.debug("Other unhandled error", exc=exc) 175 raise exc 176 177 def get_reference_object(self) -> T: 178 """Return object as it should be""" 179 raise NotImplementedError 180 181 def reconcile(self, current: T, reference: T): 182 """Check what operations should be done, should be raised as 183 ReconcileTrigger""" 184 if current.metadata.labels != reference.metadata.labels: 185 raise NeedsUpdate() 186 187 patch = self.get_patch() 188 if patch is not None: 189 try: 190 current_json = ApiClient().sanitize_for_serialization(current) 191 except AttributeError: 192 current_json = asdict(current) 193 try: 194 if apply_patch(current_json, patch) != current_json: 195 raise NeedsUpdate() 196 except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc: 197 raise ControllerException(f"JSON Patch failed: {exc}") from exc 198 199 def create(self, reference: T): 200 """API Wrapper to create object""" 201 raise NotImplementedError 202 203 def retrieve(self) -> T: 204 """API Wrapper to retrieve object""" 205 raise NotImplementedError 206 207 def delete(self, reference: T): 208 """API Wrapper to delete object""" 209 raise NotImplementedError 210 211 def update(self, current: T, reference: T): 212 """API Wrapper to update object""" 213 raise NotImplementedError 214 215 def get_object_meta(self, **kwargs) -> V1ObjectMeta: 216 """Get common object metadata""" 217 return V1ObjectMeta( 218 namespace=self.namespace, 219 labels={ 220 "app.kubernetes.io/instance": slugify(self.controller.outpost.name), 221 "app.kubernetes.io/managed-by": "goauthentik.io", 222 "app.kubernetes.io/name": f"authentik-{self.controller.outpost.type.lower()}", 223 "app.kubernetes.io/version": get_version().replace("+", "-"), 224 "goauthentik.io/outpost-name": slugify(self.controller.outpost.name), 225 "goauthentik.io/outpost-type": str(self.controller.outpost.type), 226 "goauthentik.io/outpost-uuid": self.controller.outpost.uuid.hex, 227 }, 228 **kwargs, 229 )
Base Kubernetes Reconciler, handles the basic logic.
KubernetesObjectReconciler( controller: authentik.outposts.controllers.kubernetes.KubernetesController)
def
get_patch(self):
47 def get_patch(self): 48 """Get any patches that apply to this CRD""" 49 patches = self.controller.outpost.config.kubernetes_json_patches 50 if not patches: 51 return None 52 return patches.get(self.reconciler_name(), None)
Get any patches that apply to this CRD
is_embedded: bool
54 @property 55 def is_embedded(self) -> bool: 56 """Return true if the current outpost is embedded""" 57 return self.controller.outpost.managed == MANAGED_OUTPOST
Return true if the current outpost is embedded
@staticmethod
def
reconciler_name() -> str:
59 @staticmethod 60 def reconciler_name() -> str: 61 """A name this reconciler is identified by in the configuration""" 62 raise NotImplementedError
A name this reconciler is identified by in the configuration
noop: bool
64 @property 65 def noop(self) -> bool: 66 """Return true if this object should not be created/updated/deleted in this cluster""" 67 return False
Return true if this object should not be created/updated/deleted in this cluster
name: str
69 @property 70 def name(self) -> str: 71 """Get the name of the object this reconciler manages""" 72 73 base_name = ( 74 self.controller.outpost.config.object_naming_template 75 % { 76 "name": slugify(self.controller.outpost.name), 77 "uuid": self.controller.outpost.uuid.hex, 78 } 79 ).lower() 80 81 formatted = slugify(base_name) 82 formatted = re.sub(r"[^a-z0-9-]", "-", formatted) 83 formatted = re.sub(r"-+", "-", formatted) 84 formatted = formatted[:63] 85 86 if not formatted: 87 formatted = f"outpost-{self.controller.outpost.uuid.hex}"[:63] 88 89 return formatted
Get the name of the object this reconciler manages
def
get_patched_reference_object(self) -> T:
91 def get_patched_reference_object(self) -> T: 92 """Get patched reference object""" 93 reference = self.get_reference_object() 94 patch = self.get_patch() 95 try: 96 json = ApiClient().sanitize_for_serialization(reference) 97 # Custom objects will not be known to the clients openapi types 98 except AttributeError: 99 json = asdict(reference) 100 try: 101 ref = json 102 if patch is not None: 103 ref = apply_patch(json, patch) 104 except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc: 105 raise ControllerException(f"JSON Patch failed: {exc}") from exc 106 mock_response = Response() 107 mock_response.data = dumps(ref) 108 109 try: 110 result = ApiClient().deserialize(mock_response, reference.__class__.__name__) 111 # Custom objects will not be known to the clients openapi types 112 except AttributeError: 113 result = from_dict(reference.__class__, data=ref) 114 115 return result
Get patched reference object
def
up(self):
117 def up(self): 118 """Create object if it doesn't exist, update if needed or recreate if needed.""" 119 current = None 120 if self.noop: 121 self.logger.debug("Object is noop") 122 return 123 reference = self.get_patched_reference_object() 124 try: 125 try: 126 current = self.retrieve() 127 except (OpenApiException, HTTPError) as exc: 128 if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code: 129 self.logger.debug("Failed to get current, triggering recreate") 130 raise NeedsRecreate from exc 131 self.logger.debug("Other unhandled error", exc=exc) 132 raise exc 133 self.reconcile(current, reference) 134 except NeedsUpdate: 135 try: 136 self.update(current, reference) 137 self.logger.debug("Updating") 138 except (OpenApiException, HTTPError) as exc: 139 if isinstance(exc, ApiException) and exc.status == 422: # noqa: PLR2004 140 self.logger.debug("Failed to update current, triggering re-create") 141 self._recreate(current=current, reference=reference) 142 return 143 self.logger.debug("Other unhandled error", exc=exc) 144 raise exc 145 except NeedsRecreate: 146 self._recreate(current=current, reference=reference) 147 else: 148 self.logger.debug("Object is up-to-date.")
Create object if it doesn't exist, update if needed or recreate if needed.
def
down(self):
161 def down(self): 162 """Delete object if found""" 163 if self.noop: 164 self.logger.debug("Object is noop") 165 return 166 try: 167 current = self.retrieve() 168 self.delete(current) 169 self.logger.debug("Removing") 170 except (OpenApiException, HTTPError) as exc: 171 if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code: 172 self.logger.debug("Failed to get current, assuming non-existent") 173 return 174 self.logger.debug("Other unhandled error", exc=exc) 175 raise exc
Delete object if found
def
get_reference_object(self) -> T:
177 def get_reference_object(self) -> T: 178 """Return object as it should be""" 179 raise NotImplementedError
Return object as it should be
def
reconcile(self, current: T, reference: T):
181 def reconcile(self, current: T, reference: T): 182 """Check what operations should be done, should be raised as 183 ReconcileTrigger""" 184 if current.metadata.labels != reference.metadata.labels: 185 raise NeedsUpdate() 186 187 patch = self.get_patch() 188 if patch is not None: 189 try: 190 current_json = ApiClient().sanitize_for_serialization(current) 191 except AttributeError: 192 current_json = asdict(current) 193 try: 194 if apply_patch(current_json, patch) != current_json: 195 raise NeedsUpdate() 196 except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc: 197 raise ControllerException(f"JSON Patch failed: {exc}") from exc
Check what operations should be done, should be raised as ReconcileTrigger
def
create(self, reference: T):
199 def create(self, reference: T): 200 """API Wrapper to create object""" 201 raise NotImplementedError
API Wrapper to create object
def
delete(self, reference: T):
207 def delete(self, reference: T): 208 """API Wrapper to delete object""" 209 raise NotImplementedError
API Wrapper to delete object
def
update(self, current: T, reference: T):
211 def update(self, current: T, reference: T): 212 """API Wrapper to update object""" 213 raise NotImplementedError
API Wrapper to update object
def
get_object_meta(self, **kwargs) -> kubernetes.client.models.v1_object_meta.V1ObjectMeta:
215 def get_object_meta(self, **kwargs) -> V1ObjectMeta: 216 """Get common object metadata""" 217 return V1ObjectMeta( 218 namespace=self.namespace, 219 labels={ 220 "app.kubernetes.io/instance": slugify(self.controller.outpost.name), 221 "app.kubernetes.io/managed-by": "goauthentik.io", 222 "app.kubernetes.io/name": f"authentik-{self.controller.outpost.type.lower()}", 223 "app.kubernetes.io/version": get_version().replace("+", "-"), 224 "goauthentik.io/outpost-name": slugify(self.controller.outpost.name), 225 "goauthentik.io/outpost-type": str(self.controller.outpost.type), 226 "goauthentik.io/outpost-uuid": self.controller.outpost.uuid.hex, 227 }, 228 **kwargs, 229 )
Get common object metadata