authentik.outposts.controllers.k8s.base

Base Kubernetes Reconciler

  1"""Base Kubernetes Reconciler"""
  2
  3import re
  4from dataclasses import asdict
  5from json import dumps
  6from typing import TYPE_CHECKING, TypeVar
  7
  8from dacite.core import from_dict
  9from django.http import HttpResponseNotFound
 10from django.utils.text import slugify
 11from jsonpatch import JsonPatchConflict, JsonPatchException, JsonPatchTestFailed, apply_patch
 12from kubernetes.client import ApiClient, V1ObjectMeta
 13from kubernetes.client.exceptions import ApiException, OpenApiException
 14from kubernetes.client.models.v1_deployment import V1Deployment
 15from kubernetes.client.models.v1_pod import V1Pod
 16from requests import Response
 17from structlog.stdlib import get_logger
 18from urllib3.exceptions import HTTPError
 19
 20from authentik import authentik_version
 21from authentik.outposts.apps import MANAGED_OUTPOST
 22from authentik.outposts.controllers.base import ControllerException
 23from authentik.outposts.controllers.k8s.triggers import NeedsRecreate, NeedsUpdate
 24
 25if TYPE_CHECKING:
 26    from authentik.outposts.controllers.kubernetes import KubernetesController
 27
 28T = TypeVar("T", V1Pod, V1Deployment)
 29
 30
 31def get_version() -> str:
 32    """Wrapper for authentik_version() to make testing easier"""
 33    return authentik_version()
 34
 35
 36class KubernetesObjectReconciler[T]:
 37    """Base Kubernetes Reconciler, handles the basic logic."""
 38
 39    controller: KubernetesController
 40
 41    def __init__(self, controller: KubernetesController):
 42        self.controller = controller
 43        self.namespace = controller.outpost.config.kubernetes_namespace
 44        self.logger = get_logger().bind(type=self.__class__.__name__)
 45
 46    def get_patch(self):
 47        """Get any patches that apply to this CRD"""
 48        patches = self.controller.outpost.config.kubernetes_json_patches
 49        if not patches:
 50            return None
 51        return patches.get(self.reconciler_name(), None)
 52
 53    @property
 54    def is_embedded(self) -> bool:
 55        """Return true if the current outpost is embedded"""
 56        return self.controller.outpost.managed == MANAGED_OUTPOST
 57
 58    @staticmethod
 59    def reconciler_name() -> str:
 60        """A name this reconciler is identified by in the configuration"""
 61        raise NotImplementedError
 62
 63    @property
 64    def noop(self) -> bool:
 65        """Return true if this object should not be created/updated/deleted in this cluster"""
 66        return False
 67
 68    @property
 69    def name(self) -> str:
 70        """Get the name of the object this reconciler manages"""
 71
 72        base_name = (
 73            self.controller.outpost.config.object_naming_template
 74            % {
 75                "name": slugify(self.controller.outpost.name),
 76                "uuid": self.controller.outpost.uuid.hex,
 77            }
 78        ).lower()
 79
 80        formatted = slugify(base_name)
 81        formatted = re.sub(r"[^a-z0-9-]", "-", formatted)
 82        formatted = re.sub(r"-+", "-", formatted)
 83        formatted = formatted[:63]
 84
 85        if not formatted:
 86            formatted = f"outpost-{self.controller.outpost.uuid.hex}"[:63]
 87
 88        return formatted
 89
 90    def get_patched_reference_object(self) -> T:
 91        """Get patched reference object"""
 92        reference = self.get_reference_object()
 93        patch = self.get_patch()
 94        try:
 95            json = ApiClient().sanitize_for_serialization(reference)
 96        # Custom objects will not be known to the clients openapi types
 97        except AttributeError:
 98            json = asdict(reference)
 99        try:
100            ref = json
101            if patch is not None:
102                ref = apply_patch(json, patch)
103        except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc:
104            raise ControllerException(f"JSON Patch failed: {exc}") from exc
105        mock_response = Response()
106        mock_response.data = dumps(ref)
107
108        try:
109            result = ApiClient().deserialize(mock_response, reference.__class__.__name__)
110        # Custom objects will not be known to the clients openapi types
111        except AttributeError:
112            result = from_dict(reference.__class__, data=ref)
113
114        return result
115
116    def up(self):
117        """Create object if it doesn't exist, update if needed or recreate if needed."""
118        current = None
119        if self.noop:
120            self.logger.debug("Object is noop")
121            return
122        reference = self.get_patched_reference_object()
123        try:
124            try:
125                current = self.retrieve()
126            except (OpenApiException, HTTPError) as exc:
127                if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code:
128                    self.logger.debug("Failed to get current, triggering recreate")
129                    raise NeedsRecreate from exc
130                self.logger.debug("Other unhandled error", exc=exc)
131                raise exc
132            self.reconcile(current, reference)
133        except NeedsUpdate:
134            try:
135                self.update(current, reference)
136                self.logger.debug("Updating")
137            except (OpenApiException, HTTPError) as exc:
138                if isinstance(exc, ApiException) and exc.status == 422:  # noqa: PLR2004
139                    self.logger.debug("Failed to update current, triggering re-create")
140                    self._recreate(current=current, reference=reference)
141                    return
142                self.logger.debug("Other unhandled error", exc=exc)
143                raise exc
144        except NeedsRecreate:
145            self._recreate(current=current, reference=reference)
146        else:
147            self.logger.debug("Object is up-to-date.")
148
149    def _recreate(self, reference: T, current: T | None = None):
150        """Recreate object"""
151        self.logger.debug("Recreate requested")
152        if current:
153            self.logger.debug("Deleted old")
154            self.delete(current)
155        else:
156            self.logger.debug("No old found, creating")
157        self.logger.debug("Creating")
158        self.create(reference)
159
160    def down(self):
161        """Delete object if found"""
162        if self.noop:
163            self.logger.debug("Object is noop")
164            return
165        try:
166            current = self.retrieve()
167            self.delete(current)
168            self.logger.debug("Removing")
169        except (OpenApiException, HTTPError) as exc:
170            if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code:
171                self.logger.debug("Failed to get current, assuming non-existent")
172                return
173            self.logger.debug("Other unhandled error", exc=exc)
174            raise exc
175
176    def get_reference_object(self) -> T:
177        """Return object as it should be"""
178        raise NotImplementedError
179
180    def reconcile(self, current: T, reference: T):
181        """Check what operations should be done, should be raised as
182        ReconcileTrigger"""
183        if current.metadata.labels != reference.metadata.labels:
184            raise NeedsUpdate()
185
186        patch = self.get_patch()
187        if patch is not None:
188            try:
189                current_json = ApiClient().sanitize_for_serialization(current)
190            except AttributeError:
191                current_json = asdict(current)
192            try:
193                if apply_patch(current_json, patch) != current_json:
194                    raise NeedsUpdate()
195            except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc:
196                raise ControllerException(f"JSON Patch failed: {exc}") from exc
197
198    def create(self, reference: T):
199        """API Wrapper to create object"""
200        raise NotImplementedError
201
202    def retrieve(self) -> T:
203        """API Wrapper to retrieve object"""
204        raise NotImplementedError
205
206    def delete(self, reference: T):
207        """API Wrapper to delete object"""
208        raise NotImplementedError
209
210    def update(self, current: T, reference: T):
211        """API Wrapper to update object"""
212        raise NotImplementedError
213
214    def get_object_meta(self, **kwargs) -> V1ObjectMeta:
215        """Get common object metadata"""
216        return V1ObjectMeta(
217            namespace=self.namespace,
218            labels={
219                "app.kubernetes.io/instance": slugify(self.controller.outpost.name),
220                "app.kubernetes.io/managed-by": "goauthentik.io",
221                "app.kubernetes.io/name": f"authentik-{self.controller.outpost.type.lower()}",
222                "app.kubernetes.io/version": get_version().replace("+", "-"),
223                "goauthentik.io/outpost-name": slugify(self.controller.outpost.name),
224                "goauthentik.io/outpost-type": str(self.controller.outpost.type),
225                "goauthentik.io/outpost-uuid": self.controller.outpost.uuid.hex,
226            },
227            **kwargs,
228        )
def get_version() -> str:
32def get_version() -> str:
33    """Wrapper for authentik_version() to make testing easier"""
34    return authentik_version()

Wrapper for authentik_version() to make testing easier

class KubernetesObjectReconciler(typing.Generic[T]):
 37class KubernetesObjectReconciler[T]:
 38    """Base Kubernetes Reconciler, handles the basic logic."""
 39
 40    controller: KubernetesController
 41
 42    def __init__(self, controller: KubernetesController):
 43        self.controller = controller
 44        self.namespace = controller.outpost.config.kubernetes_namespace
 45        self.logger = get_logger().bind(type=self.__class__.__name__)
 46
 47    def get_patch(self):
 48        """Get any patches that apply to this CRD"""
 49        patches = self.controller.outpost.config.kubernetes_json_patches
 50        if not patches:
 51            return None
 52        return patches.get(self.reconciler_name(), None)
 53
 54    @property
 55    def is_embedded(self) -> bool:
 56        """Return true if the current outpost is embedded"""
 57        return self.controller.outpost.managed == MANAGED_OUTPOST
 58
 59    @staticmethod
 60    def reconciler_name() -> str:
 61        """A name this reconciler is identified by in the configuration"""
 62        raise NotImplementedError
 63
 64    @property
 65    def noop(self) -> bool:
 66        """Return true if this object should not be created/updated/deleted in this cluster"""
 67        return False
 68
 69    @property
 70    def name(self) -> str:
 71        """Get the name of the object this reconciler manages"""
 72
 73        base_name = (
 74            self.controller.outpost.config.object_naming_template
 75            % {
 76                "name": slugify(self.controller.outpost.name),
 77                "uuid": self.controller.outpost.uuid.hex,
 78            }
 79        ).lower()
 80
 81        formatted = slugify(base_name)
 82        formatted = re.sub(r"[^a-z0-9-]", "-", formatted)
 83        formatted = re.sub(r"-+", "-", formatted)
 84        formatted = formatted[:63]
 85
 86        if not formatted:
 87            formatted = f"outpost-{self.controller.outpost.uuid.hex}"[:63]
 88
 89        return formatted
 90
 91    def get_patched_reference_object(self) -> T:
 92        """Get patched reference object"""
 93        reference = self.get_reference_object()
 94        patch = self.get_patch()
 95        try:
 96            json = ApiClient().sanitize_for_serialization(reference)
 97        # Custom objects will not be known to the clients openapi types
 98        except AttributeError:
 99            json = asdict(reference)
100        try:
101            ref = json
102            if patch is not None:
103                ref = apply_patch(json, patch)
104        except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc:
105            raise ControllerException(f"JSON Patch failed: {exc}") from exc
106        mock_response = Response()
107        mock_response.data = dumps(ref)
108
109        try:
110            result = ApiClient().deserialize(mock_response, reference.__class__.__name__)
111        # Custom objects will not be known to the clients openapi types
112        except AttributeError:
113            result = from_dict(reference.__class__, data=ref)
114
115        return result
116
117    def up(self):
118        """Create object if it doesn't exist, update if needed or recreate if needed."""
119        current = None
120        if self.noop:
121            self.logger.debug("Object is noop")
122            return
123        reference = self.get_patched_reference_object()
124        try:
125            try:
126                current = self.retrieve()
127            except (OpenApiException, HTTPError) as exc:
128                if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code:
129                    self.logger.debug("Failed to get current, triggering recreate")
130                    raise NeedsRecreate from exc
131                self.logger.debug("Other unhandled error", exc=exc)
132                raise exc
133            self.reconcile(current, reference)
134        except NeedsUpdate:
135            try:
136                self.update(current, reference)
137                self.logger.debug("Updating")
138            except (OpenApiException, HTTPError) as exc:
139                if isinstance(exc, ApiException) and exc.status == 422:  # noqa: PLR2004
140                    self.logger.debug("Failed to update current, triggering re-create")
141                    self._recreate(current=current, reference=reference)
142                    return
143                self.logger.debug("Other unhandled error", exc=exc)
144                raise exc
145        except NeedsRecreate:
146            self._recreate(current=current, reference=reference)
147        else:
148            self.logger.debug("Object is up-to-date.")
149
150    def _recreate(self, reference: T, current: T | None = None):
151        """Recreate object"""
152        self.logger.debug("Recreate requested")
153        if current:
154            self.logger.debug("Deleted old")
155            self.delete(current)
156        else:
157            self.logger.debug("No old found, creating")
158        self.logger.debug("Creating")
159        self.create(reference)
160
161    def down(self):
162        """Delete object if found"""
163        if self.noop:
164            self.logger.debug("Object is noop")
165            return
166        try:
167            current = self.retrieve()
168            self.delete(current)
169            self.logger.debug("Removing")
170        except (OpenApiException, HTTPError) as exc:
171            if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code:
172                self.logger.debug("Failed to get current, assuming non-existent")
173                return
174            self.logger.debug("Other unhandled error", exc=exc)
175            raise exc
176
177    def get_reference_object(self) -> T:
178        """Return object as it should be"""
179        raise NotImplementedError
180
181    def reconcile(self, current: T, reference: T):
182        """Check what operations should be done, should be raised as
183        ReconcileTrigger"""
184        if current.metadata.labels != reference.metadata.labels:
185            raise NeedsUpdate()
186
187        patch = self.get_patch()
188        if patch is not None:
189            try:
190                current_json = ApiClient().sanitize_for_serialization(current)
191            except AttributeError:
192                current_json = asdict(current)
193            try:
194                if apply_patch(current_json, patch) != current_json:
195                    raise NeedsUpdate()
196            except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc:
197                raise ControllerException(f"JSON Patch failed: {exc}") from exc
198
199    def create(self, reference: T):
200        """API Wrapper to create object"""
201        raise NotImplementedError
202
203    def retrieve(self) -> T:
204        """API Wrapper to retrieve object"""
205        raise NotImplementedError
206
207    def delete(self, reference: T):
208        """API Wrapper to delete object"""
209        raise NotImplementedError
210
211    def update(self, current: T, reference: T):
212        """API Wrapper to update object"""
213        raise NotImplementedError
214
215    def get_object_meta(self, **kwargs) -> V1ObjectMeta:
216        """Get common object metadata"""
217        return V1ObjectMeta(
218            namespace=self.namespace,
219            labels={
220                "app.kubernetes.io/instance": slugify(self.controller.outpost.name),
221                "app.kubernetes.io/managed-by": "goauthentik.io",
222                "app.kubernetes.io/name": f"authentik-{self.controller.outpost.type.lower()}",
223                "app.kubernetes.io/version": get_version().replace("+", "-"),
224                "goauthentik.io/outpost-name": slugify(self.controller.outpost.name),
225                "goauthentik.io/outpost-type": str(self.controller.outpost.type),
226                "goauthentik.io/outpost-uuid": self.controller.outpost.uuid.hex,
227            },
228            **kwargs,
229        )

Base Kubernetes Reconciler, handles the basic logic.

KubernetesObjectReconciler( controller: authentik.outposts.controllers.kubernetes.KubernetesController)
42    def __init__(self, controller: KubernetesController):
43        self.controller = controller
44        self.namespace = controller.outpost.config.kubernetes_namespace
45        self.logger = get_logger().bind(type=self.__class__.__name__)
namespace
logger
def get_patch(self):
47    def get_patch(self):
48        """Get any patches that apply to this CRD"""
49        patches = self.controller.outpost.config.kubernetes_json_patches
50        if not patches:
51            return None
52        return patches.get(self.reconciler_name(), None)

Get any patches that apply to this CRD

is_embedded: bool
54    @property
55    def is_embedded(self) -> bool:
56        """Return true if the current outpost is embedded"""
57        return self.controller.outpost.managed == MANAGED_OUTPOST

Return true if the current outpost is embedded

@staticmethod
def reconciler_name() -> str:
59    @staticmethod
60    def reconciler_name() -> str:
61        """A name this reconciler is identified by in the configuration"""
62        raise NotImplementedError

A name this reconciler is identified by in the configuration

noop: bool
64    @property
65    def noop(self) -> bool:
66        """Return true if this object should not be created/updated/deleted in this cluster"""
67        return False

Return true if this object should not be created/updated/deleted in this cluster

name: str
69    @property
70    def name(self) -> str:
71        """Get the name of the object this reconciler manages"""
72
73        base_name = (
74            self.controller.outpost.config.object_naming_template
75            % {
76                "name": slugify(self.controller.outpost.name),
77                "uuid": self.controller.outpost.uuid.hex,
78            }
79        ).lower()
80
81        formatted = slugify(base_name)
82        formatted = re.sub(r"[^a-z0-9-]", "-", formatted)
83        formatted = re.sub(r"-+", "-", formatted)
84        formatted = formatted[:63]
85
86        if not formatted:
87            formatted = f"outpost-{self.controller.outpost.uuid.hex}"[:63]
88
89        return formatted

Get the name of the object this reconciler manages

def get_patched_reference_object(self) -> T:
 91    def get_patched_reference_object(self) -> T:
 92        """Get patched reference object"""
 93        reference = self.get_reference_object()
 94        patch = self.get_patch()
 95        try:
 96            json = ApiClient().sanitize_for_serialization(reference)
 97        # Custom objects will not be known to the clients openapi types
 98        except AttributeError:
 99            json = asdict(reference)
100        try:
101            ref = json
102            if patch is not None:
103                ref = apply_patch(json, patch)
104        except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc:
105            raise ControllerException(f"JSON Patch failed: {exc}") from exc
106        mock_response = Response()
107        mock_response.data = dumps(ref)
108
109        try:
110            result = ApiClient().deserialize(mock_response, reference.__class__.__name__)
111        # Custom objects will not be known to the clients openapi types
112        except AttributeError:
113            result = from_dict(reference.__class__, data=ref)
114
115        return result

Get patched reference object

def up(self):
117    def up(self):
118        """Create object if it doesn't exist, update if needed or recreate if needed."""
119        current = None
120        if self.noop:
121            self.logger.debug("Object is noop")
122            return
123        reference = self.get_patched_reference_object()
124        try:
125            try:
126                current = self.retrieve()
127            except (OpenApiException, HTTPError) as exc:
128                if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code:
129                    self.logger.debug("Failed to get current, triggering recreate")
130                    raise NeedsRecreate from exc
131                self.logger.debug("Other unhandled error", exc=exc)
132                raise exc
133            self.reconcile(current, reference)
134        except NeedsUpdate:
135            try:
136                self.update(current, reference)
137                self.logger.debug("Updating")
138            except (OpenApiException, HTTPError) as exc:
139                if isinstance(exc, ApiException) and exc.status == 422:  # noqa: PLR2004
140                    self.logger.debug("Failed to update current, triggering re-create")
141                    self._recreate(current=current, reference=reference)
142                    return
143                self.logger.debug("Other unhandled error", exc=exc)
144                raise exc
145        except NeedsRecreate:
146            self._recreate(current=current, reference=reference)
147        else:
148            self.logger.debug("Object is up-to-date.")

Create object if it doesn't exist, update if needed or recreate if needed.

def down(self):
161    def down(self):
162        """Delete object if found"""
163        if self.noop:
164            self.logger.debug("Object is noop")
165            return
166        try:
167            current = self.retrieve()
168            self.delete(current)
169            self.logger.debug("Removing")
170        except (OpenApiException, HTTPError) as exc:
171            if isinstance(exc, ApiException) and exc.status == HttpResponseNotFound.status_code:
172                self.logger.debug("Failed to get current, assuming non-existent")
173                return
174            self.logger.debug("Other unhandled error", exc=exc)
175            raise exc

Delete object if found

def get_reference_object(self) -> T:
177    def get_reference_object(self) -> T:
178        """Return object as it should be"""
179        raise NotImplementedError

Return object as it should be

def reconcile(self, current: T, reference: T):
181    def reconcile(self, current: T, reference: T):
182        """Check what operations should be done, should be raised as
183        ReconcileTrigger"""
184        if current.metadata.labels != reference.metadata.labels:
185            raise NeedsUpdate()
186
187        patch = self.get_patch()
188        if patch is not None:
189            try:
190                current_json = ApiClient().sanitize_for_serialization(current)
191            except AttributeError:
192                current_json = asdict(current)
193            try:
194                if apply_patch(current_json, patch) != current_json:
195                    raise NeedsUpdate()
196            except (JsonPatchException, JsonPatchConflict, JsonPatchTestFailed) as exc:
197                raise ControllerException(f"JSON Patch failed: {exc}") from exc

Check what operations should be done, should be raised as ReconcileTrigger

def create(self, reference: T):
199    def create(self, reference: T):
200        """API Wrapper to create object"""
201        raise NotImplementedError

API Wrapper to create object

def retrieve(self) -> T:
203    def retrieve(self) -> T:
204        """API Wrapper to retrieve object"""
205        raise NotImplementedError

API Wrapper to retrieve object

def delete(self, reference: T):
207    def delete(self, reference: T):
208        """API Wrapper to delete object"""
209        raise NotImplementedError

API Wrapper to delete object

def update(self, current: T, reference: T):
211    def update(self, current: T, reference: T):
212        """API Wrapper to update object"""
213        raise NotImplementedError

API Wrapper to update object

def get_object_meta(self, **kwargs) -> kubernetes.client.models.v1_object_meta.V1ObjectMeta:
215    def get_object_meta(self, **kwargs) -> V1ObjectMeta:
216        """Get common object metadata"""
217        return V1ObjectMeta(
218            namespace=self.namespace,
219            labels={
220                "app.kubernetes.io/instance": slugify(self.controller.outpost.name),
221                "app.kubernetes.io/managed-by": "goauthentik.io",
222                "app.kubernetes.io/name": f"authentik-{self.controller.outpost.type.lower()}",
223                "app.kubernetes.io/version": get_version().replace("+", "-"),
224                "goauthentik.io/outpost-name": slugify(self.controller.outpost.name),
225                "goauthentik.io/outpost-type": str(self.controller.outpost.type),
226                "goauthentik.io/outpost-uuid": self.controller.outpost.uuid.hex,
227            },
228            **kwargs,
229        )

Get common object metadata