authentik.sources.kerberos.models

authentik Kerberos Source Models

  1"""authentik Kerberos Source Models"""
  2
  3import os
  4from base64 import b64decode
  5from pathlib import Path
  6from tempfile import gettempdir
  7from typing import Any
  8
  9import gssapi
 10import pglock
 11from django.db import connection, models
 12from django.http import HttpRequest
 13from django.shortcuts import reverse
 14from django.templatetags.static import static
 15from django.utils.timezone import now
 16from django.utils.translation import gettext_lazy as _
 17from kadmin import KAdm5Variant, KAdmin, KAdminApiVersion
 18from kadmin import exceptions as kadmin_exceptions
 19from rest_framework.serializers import Serializer
 20from structlog.stdlib import get_logger
 21
 22from authentik.core.models import (
 23    GroupSourceConnection,
 24    PropertyMapping,
 25    UserSourceConnection,
 26    UserTypes,
 27)
 28from authentik.core.types import UILoginButton, UserSettingSerializer
 29from authentik.flows.challenge import RedirectChallenge
 30from authentik.lib.sync.incoming.models import IncomingSyncSource
 31from authentik.lib.utils.time import fqdn_rand
 32from authentik.tasks.schedules.common import ScheduleSpec
 33
 34LOGGER = get_logger()
 35
 36
 37# Creating kadmin connections is expensive. As such, this global is used to reuse
 38# existing kadmin connections instead of creating new ones
 39_kadmin_connections: dict[str, Any] = {}
 40
 41
 42class KAdminType(models.TextChoices):
 43    MIT = "MIT"
 44    HEIMDAL = "Heimdal"
 45
 46
 47class KerberosSource(IncomingSyncSource):
 48    """Federate Kerberos realm with authentik"""
 49
 50    realm = models.TextField(help_text=_("Kerberos realm"), unique=True)
 51    krb5_conf = models.TextField(
 52        blank=True,
 53        help_text=_("Custom krb5.conf to use. Uses the system one by default"),
 54    )
 55    kadmin_type = models.TextField(
 56        choices=KAdminType.choices, default=KAdminType.MIT, help_text=_("KAdmin server type")
 57    )
 58
 59    sync_users = models.BooleanField(
 60        default=False, help_text=_("Sync users from Kerberos into authentik"), db_index=True
 61    )
 62    sync_users_password = models.BooleanField(
 63        default=True,
 64        help_text=_("When a user changes their password, sync it back to Kerberos"),
 65        db_index=True,
 66    )
 67    sync_principal = models.TextField(
 68        help_text=_("Principal to authenticate to kadmin for sync."), blank=True
 69    )
 70    sync_password = models.TextField(
 71        help_text=_("Password to authenticate to kadmin for sync"), blank=True
 72    )
 73    sync_keytab = models.TextField(
 74        help_text=_(
 75            "Keytab to authenticate to kadmin for sync. "
 76            "Must be base64-encoded or in the form TYPE:residual"
 77        ),
 78        blank=True,
 79    )
 80    sync_ccache = models.TextField(
 81        help_text=_(
 82            "Credentials cache to authenticate to kadmin for sync. "
 83            "Must be in the form TYPE:residual"
 84        ),
 85        blank=True,
 86    )
 87
 88    spnego_server_name = models.TextField(
 89        help_text=_(
 90            "Force the use of a specific server name for SPNEGO. Must be in the form HTTP@hostname"
 91        ),
 92        blank=True,
 93    )
 94    spnego_keytab = models.TextField(
 95        help_text=_("SPNEGO keytab base64-encoded or path to keytab in the form FILE:path"),
 96        blank=True,
 97    )
 98    spnego_ccache = models.TextField(
 99        help_text=_("Credential cache to use for SPNEGO in form type:residual"),
100        blank=True,
101    )
102
103    password_login_update_internal_password = models.BooleanField(
104        default=False,
105        help_text=_(
106            "If enabled, the authentik-stored password will be updated upon "
107            "login with the Kerberos password backend"
108        ),
109    )
110
111    class Meta:
112        verbose_name = _("Kerberos Source")
113        verbose_name_plural = _("Kerberos Sources")
114
115    def __str__(self):
116        return f"Kerberos Source {self.name}"
117
118    @property
119    def component(self) -> str:
120        return "ak-source-kerberos-form"
121
122    @property
123    def serializer(self) -> type[Serializer]:
124        from authentik.sources.kerberos.api.source import KerberosSourceSerializer
125
126        return KerberosSourceSerializer
127
128    @property
129    def property_mapping_type(self) -> type[PropertyMapping]:
130        return KerberosSourcePropertyMapping
131
132    @property
133    def icon_url(self) -> str:
134        icon = super().icon_url
135        if not icon:
136            return static("authentik/sources/kerberos.png")
137        return icon
138
139    @property
140    def schedule_specs(self) -> list[ScheduleSpec]:
141        from authentik.sources.kerberos.tasks import kerberos_connectivity_check, kerberos_sync
142
143        return [
144            ScheduleSpec(
145                actor=kerberos_sync,
146                uid=self.slug,
147                args=(self.pk,),
148                crontab=f"{fqdn_rand('kerberos_sync/' + str(self.pk))} */2 * * *",
149                send_on_save=True,
150            ),
151            ScheduleSpec(
152                actor=kerberos_connectivity_check,
153                uid=self.slug,
154                args=(self.pk,),
155                crontab=f"{fqdn_rand('kerberos_connectivity_check/' + str(self.pk))} * * * *",
156                send_on_save=True,
157            ),
158        ]
159
160    def ui_login_button(self, request: HttpRequest) -> UILoginButton:
161        return UILoginButton(
162            challenge=RedirectChallenge(
163                data={
164                    "to": reverse(
165                        "authentik_sources_kerberos:spnego-login",
166                        kwargs={"source_slug": self.slug},
167                    ),
168                }
169            ),
170            name=self.name,
171            icon_url=self.icon_url,
172            promoted=self.promoted,
173        )
174
175    def ui_user_settings(self) -> UserSettingSerializer | None:
176        return UserSettingSerializer(
177            data={
178                "title": self.name,
179                "component": "ak-user-settings-source-kerberos",
180                "configure_url": reverse(
181                    "authentik_sources_kerberos:spnego-login",
182                    kwargs={"source_slug": self.slug},
183                ),
184                "icon_url": self.icon_url,
185            }
186        )
187
188    @property
189    def sync_lock(self) -> pglock.advisory:
190        """Lock for syncing Kerberos to prevent multiple parallel syncs happening"""
191        return pglock.advisory(
192            lock_id=f"goauthentik.io/{connection.schema_name}/sources/kerberos/sync/{self.slug}",
193            timeout=0,
194            side_effect=pglock.Return,
195        )
196
197    def get_base_user_properties(self, principal: str, **kwargs):
198        localpart, _ = principal.rsplit("@", 1)
199
200        properties = {
201            "username": localpart,
202            "type": UserTypes.INTERNAL,
203            "path": self.get_user_path(),
204        }
205
206        if "principal_obj" in kwargs:
207            princ_expiry = kwargs["principal_obj"].expire_time
208            properties["is_active"] = princ_expiry is None or princ_expiry > now()
209
210        return properties
211
212    def get_base_group_properties(self, group_id: str, **kwargs):
213        return {
214            "name": group_id,
215        }
216
217    @property
218    def tempdir(self) -> Path:
219        """Get temporary storage for Kerberos files"""
220        path = (
221            Path(gettempdir())
222            / "authentik"
223            / connection.schema_name
224            / "sources"
225            / "kerberos"
226            / str(self.pk)
227        )
228        path.mkdir(mode=0o700, parents=True, exist_ok=True)
229        return path
230
231    @property
232    def krb5_conf_path(self) -> str | None:
233        """Get krb5.conf path"""
234        if not self.krb5_conf:
235            return None
236        conf_path = self.tempdir / "krb5.conf"
237        conf_path.write_text(self.krb5_conf)
238        return str(conf_path)
239
240    def _kadmin_init(self) -> KAdmin | None:
241        variant = KAdm5Variant.MitClient
242        api_version = KAdminApiVersion.Version2
243        match self.kadmin_type:
244            case KAdminType.MIT:
245                variant = KAdm5Variant.MitClient
246                api_version = KAdminApiVersion.Version4
247            case KAdminType.HEIMDAL:
248                variant = KAdm5Variant.HeimdalClient
249                api_version = KAdminApiVersion.Version2
250        # kadmin doesn't use a ccache for its connection
251        # as such, we don't need to create a separate ccache for each source
252        if not self.sync_principal:
253            return None
254        if self.sync_password:
255            return KAdmin.with_password(
256                variant,
257                self.sync_principal,
258                self.sync_password,
259                api_version=api_version,
260            )
261        if self.sync_keytab:
262            keytab = self.sync_keytab
263            if ":" not in keytab:
264                keytab_path = self.tempdir / "kadmin_keytab"
265                keytab_path.touch(mode=0o600)
266                keytab_path.write_bytes(b64decode(self.sync_keytab))
267                keytab = f"FILE:{keytab_path}"
268            return KAdmin.with_keytab(
269                variant,
270                self.sync_principal,
271                keytab,
272                api_version=api_version,
273            )
274        if self.sync_ccache:
275            return KAdmin.with_ccache(
276                variant,
277                self.sync_principal,
278                self.sync_ccache,
279                api_version=api_version,
280            )
281        return None
282
283    def connection(self) -> KAdmin | None:
284        """Get kadmin connection"""
285        if str(self.pk) not in _kadmin_connections:
286            kadm = self._kadmin_init()
287            if kadm is not None:
288                _kadmin_connections[str(self.pk)] = self._kadmin_init()
289        return _kadmin_connections.get(str(self.pk), None)
290
291    def check_connection(self) -> dict[str, str | bool]:
292        """Check Kerberos Connection"""
293        status: dict[str, str | bool] = {"status": "ok"}
294        if not self.sync_users:
295            return status
296        with Krb5ConfContext(self):
297            try:
298                kadm = self.connection()
299                if kadm is None:
300                    status["status"] = "no connection"
301                    return status
302                status["principal_exists"] = kadm.principal_exists(self.sync_principal)
303            except kadmin_exceptions.PyKAdminException as exc:
304                status["status"] = str(exc)
305        return status
306
307    def get_gssapi_store(self) -> dict[str, str]:
308        """Get GSSAPI credentials store for this source"""
309        ccache = self.spnego_ccache
310        keytab = None
311
312        if not ccache:
313            ccache_path = self.tempdir / "spnego_ccache"
314            ccache_path.touch(mode=0o600)
315            ccache = f"FILE:{ccache_path}"
316
317        if self.spnego_keytab:
318            # Keytab is of the form type:residual, use as-is
319            if ":" in self.spnego_keytab:
320                keytab = self.spnego_keytab
321            # Parse the keytab and write it in the file
322            else:
323                keytab_path = self.tempdir / "spnego_keytab"
324                keytab_path.touch(mode=0o600)
325                keytab_path.write_bytes(b64decode(self.spnego_keytab))
326                keytab = f"FILE:{keytab_path}"
327
328        store = {"ccache": ccache}
329        if keytab is not None:
330            store["keytab"] = keytab
331        return store
332
333    def get_gssapi_creds(self) -> gssapi.creds.Credentials | None:
334        """Get GSSAPI credentials for this source"""
335        try:
336            name = None
337            if self.spnego_server_name:
338                # pylint: disable=c-extension-no-member
339                name = gssapi.names.Name(
340                    base=self.spnego_server_name,
341                    name_type=gssapi.raw.types.NameType.hostbased_service,
342                )
343            return gssapi.creds.Credentials(
344                usage="accept", name=name, store=self.get_gssapi_store()
345            )
346        except gssapi.exceptions.GSSError as exc:
347            LOGGER.warning("GSSAPI credentials failure", exc=exc)
348            return None
349
350
351class Krb5ConfContext:
352    """
353    Context manager to set the path to the krb5.conf config file.
354    """
355
356    def __init__(self, source: KerberosSource):
357        self._source = source
358        self._path = self._source.krb5_conf_path
359        self._previous = None
360
361    def __enter__(self):
362        if not self._path:
363            return
364        self._previous = os.environ.get("KRB5_CONFIG", None)
365        os.environ["KRB5_CONFIG"] = self._path
366
367    def __exit__(self, *args, **kwargs):
368        if not self._path:
369            return
370        if self._previous:
371            os.environ["KRB5_CONFIG"] = self._previous
372        else:
373            del os.environ["KRB5_CONFIG"]
374
375
376class KerberosSourcePropertyMapping(PropertyMapping):
377    """Map Kerberos Property to User object attribute"""
378
379    @property
380    def component(self) -> str:
381        return "ak-property-mapping-source-kerberos-form"
382
383    @property
384    def serializer(self) -> type[Serializer]:
385        from authentik.sources.kerberos.api.property_mappings import (
386            KerberosSourcePropertyMappingSerializer,
387        )
388
389        return KerberosSourcePropertyMappingSerializer
390
391    def __str__(self):
392        return str(self.name)
393
394    class Meta:
395        verbose_name = _("Kerberos Source Property Mapping")
396        verbose_name_plural = _("Kerberos Source Property Mappings")
397
398
399class UserKerberosSourceConnection(UserSourceConnection):
400    """Connection to configured Kerberos Sources."""
401
402    @property
403    def serializer(self) -> type[Serializer]:
404        from authentik.sources.kerberos.api.source_connection import (
405            UserKerberosSourceConnectionSerializer,
406        )
407
408        return UserKerberosSourceConnectionSerializer
409
410    class Meta:
411        verbose_name = _("User Kerberos Source Connection")
412        verbose_name_plural = _("User Kerberos Source Connections")
413
414
415class GroupKerberosSourceConnection(GroupSourceConnection):
416    """Connection to configured Kerberos Sources."""
417
418    @property
419    def serializer(self) -> type[Serializer]:
420        from authentik.sources.kerberos.api.source_connection import (
421            GroupKerberosSourceConnectionSerializer,
422        )
423
424        return GroupKerberosSourceConnectionSerializer
425
426    class Meta:
427        verbose_name = _("Group Kerberos Source Connection")
428        verbose_name_plural = _("Group Kerberos Source Connections")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class KAdminType(django.db.models.enums.TextChoices):
43class KAdminType(models.TextChoices):
44    MIT = "MIT"
45    HEIMDAL = "Heimdal"

Class for creating enumerated string choices.

HEIMDAL = KAdminType.HEIMDAL
class KerberosSource(authentik.lib.sync.incoming.models.IncomingSyncSource):
 48class KerberosSource(IncomingSyncSource):
 49    """Federate Kerberos realm with authentik"""
 50
 51    realm = models.TextField(help_text=_("Kerberos realm"), unique=True)
 52    krb5_conf = models.TextField(
 53        blank=True,
 54        help_text=_("Custom krb5.conf to use. Uses the system one by default"),
 55    )
 56    kadmin_type = models.TextField(
 57        choices=KAdminType.choices, default=KAdminType.MIT, help_text=_("KAdmin server type")
 58    )
 59
 60    sync_users = models.BooleanField(
 61        default=False, help_text=_("Sync users from Kerberos into authentik"), db_index=True
 62    )
 63    sync_users_password = models.BooleanField(
 64        default=True,
 65        help_text=_("When a user changes their password, sync it back to Kerberos"),
 66        db_index=True,
 67    )
 68    sync_principal = models.TextField(
 69        help_text=_("Principal to authenticate to kadmin for sync."), blank=True
 70    )
 71    sync_password = models.TextField(
 72        help_text=_("Password to authenticate to kadmin for sync"), blank=True
 73    )
 74    sync_keytab = models.TextField(
 75        help_text=_(
 76            "Keytab to authenticate to kadmin for sync. "
 77            "Must be base64-encoded or in the form TYPE:residual"
 78        ),
 79        blank=True,
 80    )
 81    sync_ccache = models.TextField(
 82        help_text=_(
 83            "Credentials cache to authenticate to kadmin for sync. "
 84            "Must be in the form TYPE:residual"
 85        ),
 86        blank=True,
 87    )
 88
 89    spnego_server_name = models.TextField(
 90        help_text=_(
 91            "Force the use of a specific server name for SPNEGO. Must be in the form HTTP@hostname"
 92        ),
 93        blank=True,
 94    )
 95    spnego_keytab = models.TextField(
 96        help_text=_("SPNEGO keytab base64-encoded or path to keytab in the form FILE:path"),
 97        blank=True,
 98    )
 99    spnego_ccache = models.TextField(
100        help_text=_("Credential cache to use for SPNEGO in form type:residual"),
101        blank=True,
102    )
103
104    password_login_update_internal_password = models.BooleanField(
105        default=False,
106        help_text=_(
107            "If enabled, the authentik-stored password will be updated upon "
108            "login with the Kerberos password backend"
109        ),
110    )
111
112    class Meta:
113        verbose_name = _("Kerberos Source")
114        verbose_name_plural = _("Kerberos Sources")
115
116    def __str__(self):
117        return f"Kerberos Source {self.name}"
118
119    @property
120    def component(self) -> str:
121        return "ak-source-kerberos-form"
122
123    @property
124    def serializer(self) -> type[Serializer]:
125        from authentik.sources.kerberos.api.source import KerberosSourceSerializer
126
127        return KerberosSourceSerializer
128
129    @property
130    def property_mapping_type(self) -> type[PropertyMapping]:
131        return KerberosSourcePropertyMapping
132
133    @property
134    def icon_url(self) -> str:
135        icon = super().icon_url
136        if not icon:
137            return static("authentik/sources/kerberos.png")
138        return icon
139
140    @property
141    def schedule_specs(self) -> list[ScheduleSpec]:
142        from authentik.sources.kerberos.tasks import kerberos_connectivity_check, kerberos_sync
143
144        return [
145            ScheduleSpec(
146                actor=kerberos_sync,
147                uid=self.slug,
148                args=(self.pk,),
149                crontab=f"{fqdn_rand('kerberos_sync/' + str(self.pk))} */2 * * *",
150                send_on_save=True,
151            ),
152            ScheduleSpec(
153                actor=kerberos_connectivity_check,
154                uid=self.slug,
155                args=(self.pk,),
156                crontab=f"{fqdn_rand('kerberos_connectivity_check/' + str(self.pk))} * * * *",
157                send_on_save=True,
158            ),
159        ]
160
161    def ui_login_button(self, request: HttpRequest) -> UILoginButton:
162        return UILoginButton(
163            challenge=RedirectChallenge(
164                data={
165                    "to": reverse(
166                        "authentik_sources_kerberos:spnego-login",
167                        kwargs={"source_slug": self.slug},
168                    ),
169                }
170            ),
171            name=self.name,
172            icon_url=self.icon_url,
173            promoted=self.promoted,
174        )
175
176    def ui_user_settings(self) -> UserSettingSerializer | None:
177        return UserSettingSerializer(
178            data={
179                "title": self.name,
180                "component": "ak-user-settings-source-kerberos",
181                "configure_url": reverse(
182                    "authentik_sources_kerberos:spnego-login",
183                    kwargs={"source_slug": self.slug},
184                ),
185                "icon_url": self.icon_url,
186            }
187        )
188
189    @property
190    def sync_lock(self) -> pglock.advisory:
191        """Lock for syncing Kerberos to prevent multiple parallel syncs happening"""
192        return pglock.advisory(
193            lock_id=f"goauthentik.io/{connection.schema_name}/sources/kerberos/sync/{self.slug}",
194            timeout=0,
195            side_effect=pglock.Return,
196        )
197
198    def get_base_user_properties(self, principal: str, **kwargs):
199        localpart, _ = principal.rsplit("@", 1)
200
201        properties = {
202            "username": localpart,
203            "type": UserTypes.INTERNAL,
204            "path": self.get_user_path(),
205        }
206
207        if "principal_obj" in kwargs:
208            princ_expiry = kwargs["principal_obj"].expire_time
209            properties["is_active"] = princ_expiry is None or princ_expiry > now()
210
211        return properties
212
213    def get_base_group_properties(self, group_id: str, **kwargs):
214        return {
215            "name": group_id,
216        }
217
218    @property
219    def tempdir(self) -> Path:
220        """Get temporary storage for Kerberos files"""
221        path = (
222            Path(gettempdir())
223            / "authentik"
224            / connection.schema_name
225            / "sources"
226            / "kerberos"
227            / str(self.pk)
228        )
229        path.mkdir(mode=0o700, parents=True, exist_ok=True)
230        return path
231
232    @property
233    def krb5_conf_path(self) -> str | None:
234        """Get krb5.conf path"""
235        if not self.krb5_conf:
236            return None
237        conf_path = self.tempdir / "krb5.conf"
238        conf_path.write_text(self.krb5_conf)
239        return str(conf_path)
240
241    def _kadmin_init(self) -> KAdmin | None:
242        variant = KAdm5Variant.MitClient
243        api_version = KAdminApiVersion.Version2
244        match self.kadmin_type:
245            case KAdminType.MIT:
246                variant = KAdm5Variant.MitClient
247                api_version = KAdminApiVersion.Version4
248            case KAdminType.HEIMDAL:
249                variant = KAdm5Variant.HeimdalClient
250                api_version = KAdminApiVersion.Version2
251        # kadmin doesn't use a ccache for its connection
252        # as such, we don't need to create a separate ccache for each source
253        if not self.sync_principal:
254            return None
255        if self.sync_password:
256            return KAdmin.with_password(
257                variant,
258                self.sync_principal,
259                self.sync_password,
260                api_version=api_version,
261            )
262        if self.sync_keytab:
263            keytab = self.sync_keytab
264            if ":" not in keytab:
265                keytab_path = self.tempdir / "kadmin_keytab"
266                keytab_path.touch(mode=0o600)
267                keytab_path.write_bytes(b64decode(self.sync_keytab))
268                keytab = f"FILE:{keytab_path}"
269            return KAdmin.with_keytab(
270                variant,
271                self.sync_principal,
272                keytab,
273                api_version=api_version,
274            )
275        if self.sync_ccache:
276            return KAdmin.with_ccache(
277                variant,
278                self.sync_principal,
279                self.sync_ccache,
280                api_version=api_version,
281            )
282        return None
283
284    def connection(self) -> KAdmin | None:
285        """Get kadmin connection"""
286        if str(self.pk) not in _kadmin_connections:
287            kadm = self._kadmin_init()
288            if kadm is not None:
289                _kadmin_connections[str(self.pk)] = self._kadmin_init()
290        return _kadmin_connections.get(str(self.pk), None)
291
292    def check_connection(self) -> dict[str, str | bool]:
293        """Check Kerberos Connection"""
294        status: dict[str, str | bool] = {"status": "ok"}
295        if not self.sync_users:
296            return status
297        with Krb5ConfContext(self):
298            try:
299                kadm = self.connection()
300                if kadm is None:
301                    status["status"] = "no connection"
302                    return status
303                status["principal_exists"] = kadm.principal_exists(self.sync_principal)
304            except kadmin_exceptions.PyKAdminException as exc:
305                status["status"] = str(exc)
306        return status
307
308    def get_gssapi_store(self) -> dict[str, str]:
309        """Get GSSAPI credentials store for this source"""
310        ccache = self.spnego_ccache
311        keytab = None
312
313        if not ccache:
314            ccache_path = self.tempdir / "spnego_ccache"
315            ccache_path.touch(mode=0o600)
316            ccache = f"FILE:{ccache_path}"
317
318        if self.spnego_keytab:
319            # Keytab is of the form type:residual, use as-is
320            if ":" in self.spnego_keytab:
321                keytab = self.spnego_keytab
322            # Parse the keytab and write it in the file
323            else:
324                keytab_path = self.tempdir / "spnego_keytab"
325                keytab_path.touch(mode=0o600)
326                keytab_path.write_bytes(b64decode(self.spnego_keytab))
327                keytab = f"FILE:{keytab_path}"
328
329        store = {"ccache": ccache}
330        if keytab is not None:
331            store["keytab"] = keytab
332        return store
333
334    def get_gssapi_creds(self) -> gssapi.creds.Credentials | None:
335        """Get GSSAPI credentials for this source"""
336        try:
337            name = None
338            if self.spnego_server_name:
339                # pylint: disable=c-extension-no-member
340                name = gssapi.names.Name(
341                    base=self.spnego_server_name,
342                    name_type=gssapi.raw.types.NameType.hostbased_service,
343                )
344            return gssapi.creds.Credentials(
345                usage="accept", name=name, store=self.get_gssapi_store()
346            )
347        except gssapi.exceptions.GSSError as exc:
348            LOGGER.warning("GSSAPI credentials failure", exc=exc)
349            return None

Federate Kerberos realm with authentik

def realm(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def krb5_conf(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def kadmin_type(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_users(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_users_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_principal(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_keytab(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def sync_ccache(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def spnego_server_name(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def spnego_keytab(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def spnego_ccache(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def password_login_update_internal_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

component: str
119    @property
120    def component(self) -> str:
121        return "ak-source-kerberos-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
123    @property
124    def serializer(self) -> type[Serializer]:
125        from authentik.sources.kerberos.api.source import KerberosSourceSerializer
126
127        return KerberosSourceSerializer

Get serializer for this model

property_mapping_type: type[authentik.core.models.PropertyMapping]
129    @property
130    def property_mapping_type(self) -> type[PropertyMapping]:
131        return KerberosSourcePropertyMapping

Return property mapping type used by this object

icon_url: str
133    @property
134    def icon_url(self) -> str:
135        icon = super().icon_url
136        if not icon:
137            return static("authentik/sources/kerberos.png")
138        return icon

Get the URL to the source icon

schedule_specs: list[authentik.tasks.schedules.common.ScheduleSpec]
140    @property
141    def schedule_specs(self) -> list[ScheduleSpec]:
142        from authentik.sources.kerberos.tasks import kerberos_connectivity_check, kerberos_sync
143
144        return [
145            ScheduleSpec(
146                actor=kerberos_sync,
147                uid=self.slug,
148                args=(self.pk,),
149                crontab=f"{fqdn_rand('kerberos_sync/' + str(self.pk))} */2 * * *",
150                send_on_save=True,
151            ),
152            ScheduleSpec(
153                actor=kerberos_connectivity_check,
154                uid=self.slug,
155                args=(self.pk,),
156                crontab=f"{fqdn_rand('kerberos_connectivity_check/' + str(self.pk))} * * * *",
157                send_on_save=True,
158            ),
159        ]
def ui_login_button( self, request: django.http.request.HttpRequest) -> authentik.core.types.UILoginButton:
161    def ui_login_button(self, request: HttpRequest) -> UILoginButton:
162        return UILoginButton(
163            challenge=RedirectChallenge(
164                data={
165                    "to": reverse(
166                        "authentik_sources_kerberos:spnego-login",
167                        kwargs={"source_slug": self.slug},
168                    ),
169                }
170            ),
171            name=self.name,
172            icon_url=self.icon_url,
173            promoted=self.promoted,
174        )

If source uses a http-based flow, return UI Information about the login button. If source doesn't use http-based flow, return None.

def ui_user_settings(self) -> authentik.core.types.UserSettingSerializer | None:
176    def ui_user_settings(self) -> UserSettingSerializer | None:
177        return UserSettingSerializer(
178            data={
179                "title": self.name,
180                "component": "ak-user-settings-source-kerberos",
181                "configure_url": reverse(
182                    "authentik_sources_kerberos:spnego-login",
183                    kwargs={"source_slug": self.slug},
184                ),
185                "icon_url": self.icon_url,
186            }
187        )

Entrypoint to integrate with User settings. Can either return None if no user settings are available, or UserSettingSerializer.

sync_lock: pglock.core.advisory
189    @property
190    def sync_lock(self) -> pglock.advisory:
191        """Lock for syncing Kerberos to prevent multiple parallel syncs happening"""
192        return pglock.advisory(
193            lock_id=f"goauthentik.io/{connection.schema_name}/sources/kerberos/sync/{self.slug}",
194            timeout=0,
195            side_effect=pglock.Return,
196        )

Lock for syncing Kerberos to prevent multiple parallel syncs happening

def get_base_user_properties(self, principal: str, **kwargs):
198    def get_base_user_properties(self, principal: str, **kwargs):
199        localpart, _ = principal.rsplit("@", 1)
200
201        properties = {
202            "username": localpart,
203            "type": UserTypes.INTERNAL,
204            "path": self.get_user_path(),
205        }
206
207        if "principal_obj" in kwargs:
208            princ_expiry = kwargs["principal_obj"].expire_time
209            properties["is_active"] = princ_expiry is None or princ_expiry > now()
210
211        return properties

Get base properties for a user to build final properties upon.

def get_base_group_properties(self, group_id: str, **kwargs):
213    def get_base_group_properties(self, group_id: str, **kwargs):
214        return {
215            "name": group_id,
216        }

Get base properties for a group to build final properties upon.

tempdir: pathlib.Path
218    @property
219    def tempdir(self) -> Path:
220        """Get temporary storage for Kerberos files"""
221        path = (
222            Path(gettempdir())
223            / "authentik"
224            / connection.schema_name
225            / "sources"
226            / "kerberos"
227            / str(self.pk)
228        )
229        path.mkdir(mode=0o700, parents=True, exist_ok=True)
230        return path

Get temporary storage for Kerberos files

krb5_conf_path: str | None
232    @property
233    def krb5_conf_path(self) -> str | None:
234        """Get krb5.conf path"""
235        if not self.krb5_conf:
236            return None
237        conf_path = self.tempdir / "krb5.conf"
238        conf_path.write_text(self.krb5_conf)
239        return str(conf_path)

Get krb5.conf path

def connection(self) -> KAdmin | None:
284    def connection(self) -> KAdmin | None:
285        """Get kadmin connection"""
286        if str(self.pk) not in _kadmin_connections:
287            kadm = self._kadmin_init()
288            if kadm is not None:
289                _kadmin_connections[str(self.pk)] = self._kadmin_init()
290        return _kadmin_connections.get(str(self.pk), None)

Get kadmin connection

def check_connection(self) -> dict[str, str | bool]:
292    def check_connection(self) -> dict[str, str | bool]:
293        """Check Kerberos Connection"""
294        status: dict[str, str | bool] = {"status": "ok"}
295        if not self.sync_users:
296            return status
297        with Krb5ConfContext(self):
298            try:
299                kadm = self.connection()
300                if kadm is None:
301                    status["status"] = "no connection"
302                    return status
303                status["principal_exists"] = kadm.principal_exists(self.sync_principal)
304            except kadmin_exceptions.PyKAdminException as exc:
305                status["status"] = str(exc)
306        return status

Check Kerberos Connection

def get_gssapi_store(self) -> dict[str, str]:
308    def get_gssapi_store(self) -> dict[str, str]:
309        """Get GSSAPI credentials store for this source"""
310        ccache = self.spnego_ccache
311        keytab = None
312
313        if not ccache:
314            ccache_path = self.tempdir / "spnego_ccache"
315            ccache_path.touch(mode=0o600)
316            ccache = f"FILE:{ccache_path}"
317
318        if self.spnego_keytab:
319            # Keytab is of the form type:residual, use as-is
320            if ":" in self.spnego_keytab:
321                keytab = self.spnego_keytab
322            # Parse the keytab and write it in the file
323            else:
324                keytab_path = self.tempdir / "spnego_keytab"
325                keytab_path.touch(mode=0o600)
326                keytab_path.write_bytes(b64decode(self.spnego_keytab))
327                keytab = f"FILE:{keytab_path}"
328
329        store = {"ccache": ccache}
330        if keytab is not None:
331            store["keytab"] = keytab
332        return store

Get GSSAPI credentials store for this source

def get_gssapi_creds(self) -> gssapi.creds.Credentials | None:
334    def get_gssapi_creds(self) -> gssapi.creds.Credentials | None:
335        """Get GSSAPI credentials for this source"""
336        try:
337            name = None
338            if self.spnego_server_name:
339                # pylint: disable=c-extension-no-member
340                name = gssapi.names.Name(
341                    base=self.spnego_server_name,
342                    name_type=gssapi.raw.types.NameType.hostbased_service,
343                )
344            return gssapi.creds.Credentials(
345                usage="accept", name=name, store=self.get_gssapi_store()
346            )
347        except gssapi.exceptions.GSSError as exc:
348            LOGGER.warning("GSSAPI credentials failure", exc=exc)
349            return None

Get GSSAPI credentials for this source

def sync_outgoing_trigger_mode(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

schedules

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

def get_kadmin_type_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

source_ptr_id
source_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

def get_sync_outgoing_trigger_mode_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

class KerberosSource.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class KerberosSource.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class Krb5ConfContext:
352class Krb5ConfContext:
353    """
354    Context manager to set the path to the krb5.conf config file.
355    """
356
357    def __init__(self, source: KerberosSource):
358        self._source = source
359        self._path = self._source.krb5_conf_path
360        self._previous = None
361
362    def __enter__(self):
363        if not self._path:
364            return
365        self._previous = os.environ.get("KRB5_CONFIG", None)
366        os.environ["KRB5_CONFIG"] = self._path
367
368    def __exit__(self, *args, **kwargs):
369        if not self._path:
370            return
371        if self._previous:
372            os.environ["KRB5_CONFIG"] = self._previous
373        else:
374            del os.environ["KRB5_CONFIG"]

Context manager to set the path to the krb5.conf config file.

Krb5ConfContext(source: KerberosSource)
357    def __init__(self, source: KerberosSource):
358        self._source = source
359        self._path = self._source.krb5_conf_path
360        self._previous = None
class KerberosSourcePropertyMapping(authentik.core.models.PropertyMapping):
377class KerberosSourcePropertyMapping(PropertyMapping):
378    """Map Kerberos Property to User object attribute"""
379
380    @property
381    def component(self) -> str:
382        return "ak-property-mapping-source-kerberos-form"
383
384    @property
385    def serializer(self) -> type[Serializer]:
386        from authentik.sources.kerberos.api.property_mappings import (
387            KerberosSourcePropertyMappingSerializer,
388        )
389
390        return KerberosSourcePropertyMappingSerializer
391
392    def __str__(self):
393        return str(self.name)
394
395    class Meta:
396        verbose_name = _("Kerberos Source Property Mapping")
397        verbose_name_plural = _("Kerberos Source Property Mappings")

Map Kerberos Property to User object attribute

component: str
380    @property
381    def component(self) -> str:
382        return "ak-property-mapping-source-kerberos-form"

Return component used to edit this object

serializer: type[rest_framework.serializers.Serializer]
384    @property
385    def serializer(self) -> type[Serializer]:
386        from authentik.sources.kerberos.api.property_mappings import (
387            KerberosSourcePropertyMappingSerializer,
388        )
389
390        return KerberosSourcePropertyMappingSerializer

Get serializer for this model

propertymapping_ptr_id
propertymapping_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class KerberosSourcePropertyMapping.DoesNotExist(authentik.core.models.PropertyMapping.DoesNotExist):

The requested object does not exist

class KerberosSourcePropertyMapping.MultipleObjectsReturned(authentik.core.models.PropertyMapping.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class UserKerberosSourceConnection(authentik.core.models.UserSourceConnection):
400class UserKerberosSourceConnection(UserSourceConnection):
401    """Connection to configured Kerberos Sources."""
402
403    @property
404    def serializer(self) -> type[Serializer]:
405        from authentik.sources.kerberos.api.source_connection import (
406            UserKerberosSourceConnectionSerializer,
407        )
408
409        return UserKerberosSourceConnectionSerializer
410
411    class Meta:
412        verbose_name = _("User Kerberos Source Connection")
413        verbose_name_plural = _("User Kerberos Source Connections")

Connection to configured Kerberos Sources.

serializer: type[rest_framework.serializers.Serializer]
403    @property
404    def serializer(self) -> type[Serializer]:
405        from authentik.sources.kerberos.api.source_connection import (
406            UserKerberosSourceConnectionSerializer,
407        )
408
409        return UserKerberosSourceConnectionSerializer

Get serializer for this model

usersourceconnection_ptr_id
usersourceconnection_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class UserKerberosSourceConnection.DoesNotExist(authentik.core.models.UserSourceConnection.DoesNotExist):

The requested object does not exist

class UserKerberosSourceConnection.MultipleObjectsReturned(authentik.core.models.UserSourceConnection.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class GroupKerberosSourceConnection(authentik.core.models.GroupSourceConnection):
416class GroupKerberosSourceConnection(GroupSourceConnection):
417    """Connection to configured Kerberos Sources."""
418
419    @property
420    def serializer(self) -> type[Serializer]:
421        from authentik.sources.kerberos.api.source_connection import (
422            GroupKerberosSourceConnectionSerializer,
423        )
424
425        return GroupKerberosSourceConnectionSerializer
426
427    class Meta:
428        verbose_name = _("Group Kerberos Source Connection")
429        verbose_name_plural = _("Group Kerberos Source Connections")

Connection to configured Kerberos Sources.

serializer: type[rest_framework.serializers.Serializer]
419    @property
420    def serializer(self) -> type[Serializer]:
421        from authentik.sources.kerberos.api.source_connection import (
422            GroupKerberosSourceConnectionSerializer,
423        )
424
425        return GroupKerberosSourceConnectionSerializer

Get serializer for this model

groupsourceconnection_ptr_id
groupsourceconnection_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class GroupKerberosSourceConnection.DoesNotExist(authentik.core.models.GroupSourceConnection.DoesNotExist):

The requested object does not exist

class GroupKerberosSourceConnection.MultipleObjectsReturned(authentik.core.models.GroupSourceConnection.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.