authentik.sources.kerberos.models
authentik Kerberos Source Models
1"""authentik Kerberos Source Models""" 2 3import os 4from base64 import b64decode 5from pathlib import Path 6from tempfile import gettempdir 7from typing import Any 8 9import gssapi 10import pglock 11from django.db import connection, models 12from django.http import HttpRequest 13from django.shortcuts import reverse 14from django.templatetags.static import static 15from django.utils.timezone import now 16from django.utils.translation import gettext_lazy as _ 17from kadmin import KAdm5Variant, KAdmin, KAdminApiVersion 18from kadmin import exceptions as kadmin_exceptions 19from rest_framework.serializers import Serializer 20from structlog.stdlib import get_logger 21 22from authentik.core.models import ( 23 GroupSourceConnection, 24 PropertyMapping, 25 UserSourceConnection, 26 UserTypes, 27) 28from authentik.core.types import UILoginButton, UserSettingSerializer 29from authentik.flows.challenge import RedirectChallenge 30from authentik.lib.sync.incoming.models import IncomingSyncSource 31from authentik.lib.utils.time import fqdn_rand 32from authentik.tasks.schedules.common import ScheduleSpec 33 34LOGGER = get_logger() 35 36 37# Creating kadmin connections is expensive. As such, this global is used to reuse 38# existing kadmin connections instead of creating new ones 39_kadmin_connections: dict[str, Any] = {} 40 41 42class KAdminType(models.TextChoices): 43 MIT = "MIT" 44 HEIMDAL = "Heimdal" 45 46 47class KerberosSource(IncomingSyncSource): 48 """Federate Kerberos realm with authentik""" 49 50 realm = models.TextField(help_text=_("Kerberos realm"), unique=True) 51 krb5_conf = models.TextField( 52 blank=True, 53 help_text=_("Custom krb5.conf to use. Uses the system one by default"), 54 ) 55 kadmin_type = models.TextField( 56 choices=KAdminType.choices, default=KAdminType.MIT, help_text=_("KAdmin server type") 57 ) 58 59 sync_users = models.BooleanField( 60 default=False, help_text=_("Sync users from Kerberos into authentik"), db_index=True 61 ) 62 sync_users_password = models.BooleanField( 63 default=True, 64 help_text=_("When a user changes their password, sync it back to Kerberos"), 65 db_index=True, 66 ) 67 sync_principal = models.TextField( 68 help_text=_("Principal to authenticate to kadmin for sync."), blank=True 69 ) 70 sync_password = models.TextField( 71 help_text=_("Password to authenticate to kadmin for sync"), blank=True 72 ) 73 sync_keytab = models.TextField( 74 help_text=_( 75 "Keytab to authenticate to kadmin for sync. " 76 "Must be base64-encoded or in the form TYPE:residual" 77 ), 78 blank=True, 79 ) 80 sync_ccache = models.TextField( 81 help_text=_( 82 "Credentials cache to authenticate to kadmin for sync. " 83 "Must be in the form TYPE:residual" 84 ), 85 blank=True, 86 ) 87 88 spnego_server_name = models.TextField( 89 help_text=_( 90 "Force the use of a specific server name for SPNEGO. Must be in the form HTTP@hostname" 91 ), 92 blank=True, 93 ) 94 spnego_keytab = models.TextField( 95 help_text=_("SPNEGO keytab base64-encoded or path to keytab in the form FILE:path"), 96 blank=True, 97 ) 98 spnego_ccache = models.TextField( 99 help_text=_("Credential cache to use for SPNEGO in form type:residual"), 100 blank=True, 101 ) 102 103 password_login_update_internal_password = models.BooleanField( 104 default=False, 105 help_text=_( 106 "If enabled, the authentik-stored password will be updated upon " 107 "login with the Kerberos password backend" 108 ), 109 ) 110 111 class Meta: 112 verbose_name = _("Kerberos Source") 113 verbose_name_plural = _("Kerberos Sources") 114 115 def __str__(self): 116 return f"Kerberos Source {self.name}" 117 118 @property 119 def component(self) -> str: 120 return "ak-source-kerberos-form" 121 122 @property 123 def serializer(self) -> type[Serializer]: 124 from authentik.sources.kerberos.api.source import KerberosSourceSerializer 125 126 return KerberosSourceSerializer 127 128 @property 129 def property_mapping_type(self) -> type[PropertyMapping]: 130 return KerberosSourcePropertyMapping 131 132 @property 133 def icon_url(self) -> str: 134 icon = super().icon_url 135 if not icon: 136 return static("authentik/sources/kerberos.png") 137 return icon 138 139 @property 140 def schedule_specs(self) -> list[ScheduleSpec]: 141 from authentik.sources.kerberos.tasks import kerberos_connectivity_check, kerberos_sync 142 143 return [ 144 ScheduleSpec( 145 actor=kerberos_sync, 146 uid=self.slug, 147 args=(self.pk,), 148 crontab=f"{fqdn_rand('kerberos_sync/' + str(self.pk))} */2 * * *", 149 send_on_save=True, 150 ), 151 ScheduleSpec( 152 actor=kerberos_connectivity_check, 153 uid=self.slug, 154 args=(self.pk,), 155 crontab=f"{fqdn_rand('kerberos_connectivity_check/' + str(self.pk))} * * * *", 156 send_on_save=True, 157 ), 158 ] 159 160 def ui_login_button(self, request: HttpRequest) -> UILoginButton: 161 return UILoginButton( 162 challenge=RedirectChallenge( 163 data={ 164 "to": reverse( 165 "authentik_sources_kerberos:spnego-login", 166 kwargs={"source_slug": self.slug}, 167 ), 168 } 169 ), 170 name=self.name, 171 icon_url=self.icon_url, 172 promoted=self.promoted, 173 ) 174 175 def ui_user_settings(self) -> UserSettingSerializer | None: 176 return UserSettingSerializer( 177 data={ 178 "title": self.name, 179 "component": "ak-user-settings-source-kerberos", 180 "configure_url": reverse( 181 "authentik_sources_kerberos:spnego-login", 182 kwargs={"source_slug": self.slug}, 183 ), 184 "icon_url": self.icon_url, 185 } 186 ) 187 188 @property 189 def sync_lock(self) -> pglock.advisory: 190 """Lock for syncing Kerberos to prevent multiple parallel syncs happening""" 191 return pglock.advisory( 192 lock_id=f"goauthentik.io/{connection.schema_name}/sources/kerberos/sync/{self.slug}", 193 timeout=0, 194 side_effect=pglock.Return, 195 ) 196 197 def get_base_user_properties(self, principal: str, **kwargs): 198 localpart, _ = principal.rsplit("@", 1) 199 200 properties = { 201 "username": localpart, 202 "type": UserTypes.INTERNAL, 203 "path": self.get_user_path(), 204 } 205 206 if "principal_obj" in kwargs: 207 princ_expiry = kwargs["principal_obj"].expire_time 208 properties["is_active"] = princ_expiry is None or princ_expiry > now() 209 210 return properties 211 212 def get_base_group_properties(self, group_id: str, **kwargs): 213 return { 214 "name": group_id, 215 } 216 217 @property 218 def tempdir(self) -> Path: 219 """Get temporary storage for Kerberos files""" 220 path = ( 221 Path(gettempdir()) 222 / "authentik" 223 / connection.schema_name 224 / "sources" 225 / "kerberos" 226 / str(self.pk) 227 ) 228 path.mkdir(mode=0o700, parents=True, exist_ok=True) 229 return path 230 231 @property 232 def krb5_conf_path(self) -> str | None: 233 """Get krb5.conf path""" 234 if not self.krb5_conf: 235 return None 236 conf_path = self.tempdir / "krb5.conf" 237 conf_path.write_text(self.krb5_conf) 238 return str(conf_path) 239 240 def _kadmin_init(self) -> KAdmin | None: 241 variant = KAdm5Variant.MitClient 242 api_version = KAdminApiVersion.Version2 243 match self.kadmin_type: 244 case KAdminType.MIT: 245 variant = KAdm5Variant.MitClient 246 api_version = KAdminApiVersion.Version4 247 case KAdminType.HEIMDAL: 248 variant = KAdm5Variant.HeimdalClient 249 api_version = KAdminApiVersion.Version2 250 # kadmin doesn't use a ccache for its connection 251 # as such, we don't need to create a separate ccache for each source 252 if not self.sync_principal: 253 return None 254 if self.sync_password: 255 return KAdmin.with_password( 256 variant, 257 self.sync_principal, 258 self.sync_password, 259 api_version=api_version, 260 ) 261 if self.sync_keytab: 262 keytab = self.sync_keytab 263 if ":" not in keytab: 264 keytab_path = self.tempdir / "kadmin_keytab" 265 keytab_path.touch(mode=0o600) 266 keytab_path.write_bytes(b64decode(self.sync_keytab)) 267 keytab = f"FILE:{keytab_path}" 268 return KAdmin.with_keytab( 269 variant, 270 self.sync_principal, 271 keytab, 272 api_version=api_version, 273 ) 274 if self.sync_ccache: 275 return KAdmin.with_ccache( 276 variant, 277 self.sync_principal, 278 self.sync_ccache, 279 api_version=api_version, 280 ) 281 return None 282 283 def connection(self) -> KAdmin | None: 284 """Get kadmin connection""" 285 if str(self.pk) not in _kadmin_connections: 286 kadm = self._kadmin_init() 287 if kadm is not None: 288 _kadmin_connections[str(self.pk)] = self._kadmin_init() 289 return _kadmin_connections.get(str(self.pk), None) 290 291 def check_connection(self) -> dict[str, str | bool]: 292 """Check Kerberos Connection""" 293 status: dict[str, str | bool] = {"status": "ok"} 294 if not self.sync_users: 295 return status 296 with Krb5ConfContext(self): 297 try: 298 kadm = self.connection() 299 if kadm is None: 300 status["status"] = "no connection" 301 return status 302 status["principal_exists"] = kadm.principal_exists(self.sync_principal) 303 except kadmin_exceptions.PyKAdminException as exc: 304 status["status"] = str(exc) 305 return status 306 307 def get_gssapi_store(self) -> dict[str, str]: 308 """Get GSSAPI credentials store for this source""" 309 ccache = self.spnego_ccache 310 keytab = None 311 312 if not ccache: 313 ccache_path = self.tempdir / "spnego_ccache" 314 ccache_path.touch(mode=0o600) 315 ccache = f"FILE:{ccache_path}" 316 317 if self.spnego_keytab: 318 # Keytab is of the form type:residual, use as-is 319 if ":" in self.spnego_keytab: 320 keytab = self.spnego_keytab 321 # Parse the keytab and write it in the file 322 else: 323 keytab_path = self.tempdir / "spnego_keytab" 324 keytab_path.touch(mode=0o600) 325 keytab_path.write_bytes(b64decode(self.spnego_keytab)) 326 keytab = f"FILE:{keytab_path}" 327 328 store = {"ccache": ccache} 329 if keytab is not None: 330 store["keytab"] = keytab 331 return store 332 333 def get_gssapi_creds(self) -> gssapi.creds.Credentials | None: 334 """Get GSSAPI credentials for this source""" 335 try: 336 name = None 337 if self.spnego_server_name: 338 # pylint: disable=c-extension-no-member 339 name = gssapi.names.Name( 340 base=self.spnego_server_name, 341 name_type=gssapi.raw.types.NameType.hostbased_service, 342 ) 343 return gssapi.creds.Credentials( 344 usage="accept", name=name, store=self.get_gssapi_store() 345 ) 346 except gssapi.exceptions.GSSError as exc: 347 LOGGER.warning("GSSAPI credentials failure", exc=exc) 348 return None 349 350 351class Krb5ConfContext: 352 """ 353 Context manager to set the path to the krb5.conf config file. 354 """ 355 356 def __init__(self, source: KerberosSource): 357 self._source = source 358 self._path = self._source.krb5_conf_path 359 self._previous = None 360 361 def __enter__(self): 362 if not self._path: 363 return 364 self._previous = os.environ.get("KRB5_CONFIG", None) 365 os.environ["KRB5_CONFIG"] = self._path 366 367 def __exit__(self, *args, **kwargs): 368 if not self._path: 369 return 370 if self._previous: 371 os.environ["KRB5_CONFIG"] = self._previous 372 else: 373 del os.environ["KRB5_CONFIG"] 374 375 376class KerberosSourcePropertyMapping(PropertyMapping): 377 """Map Kerberos Property to User object attribute""" 378 379 @property 380 def component(self) -> str: 381 return "ak-property-mapping-source-kerberos-form" 382 383 @property 384 def serializer(self) -> type[Serializer]: 385 from authentik.sources.kerberos.api.property_mappings import ( 386 KerberosSourcePropertyMappingSerializer, 387 ) 388 389 return KerberosSourcePropertyMappingSerializer 390 391 def __str__(self): 392 return str(self.name) 393 394 class Meta: 395 verbose_name = _("Kerberos Source Property Mapping") 396 verbose_name_plural = _("Kerberos Source Property Mappings") 397 398 399class UserKerberosSourceConnection(UserSourceConnection): 400 """Connection to configured Kerberos Sources.""" 401 402 @property 403 def serializer(self) -> type[Serializer]: 404 from authentik.sources.kerberos.api.source_connection import ( 405 UserKerberosSourceConnectionSerializer, 406 ) 407 408 return UserKerberosSourceConnectionSerializer 409 410 class Meta: 411 verbose_name = _("User Kerberos Source Connection") 412 verbose_name_plural = _("User Kerberos Source Connections") 413 414 415class GroupKerberosSourceConnection(GroupSourceConnection): 416 """Connection to configured Kerberos Sources.""" 417 418 @property 419 def serializer(self) -> type[Serializer]: 420 from authentik.sources.kerberos.api.source_connection import ( 421 GroupKerberosSourceConnectionSerializer, 422 ) 423 424 return GroupKerberosSourceConnectionSerializer 425 426 class Meta: 427 verbose_name = _("Group Kerberos Source Connection") 428 verbose_name_plural = _("Group Kerberos Source Connections")
Class for creating enumerated string choices.
48class KerberosSource(IncomingSyncSource): 49 """Federate Kerberos realm with authentik""" 50 51 realm = models.TextField(help_text=_("Kerberos realm"), unique=True) 52 krb5_conf = models.TextField( 53 blank=True, 54 help_text=_("Custom krb5.conf to use. Uses the system one by default"), 55 ) 56 kadmin_type = models.TextField( 57 choices=KAdminType.choices, default=KAdminType.MIT, help_text=_("KAdmin server type") 58 ) 59 60 sync_users = models.BooleanField( 61 default=False, help_text=_("Sync users from Kerberos into authentik"), db_index=True 62 ) 63 sync_users_password = models.BooleanField( 64 default=True, 65 help_text=_("When a user changes their password, sync it back to Kerberos"), 66 db_index=True, 67 ) 68 sync_principal = models.TextField( 69 help_text=_("Principal to authenticate to kadmin for sync."), blank=True 70 ) 71 sync_password = models.TextField( 72 help_text=_("Password to authenticate to kadmin for sync"), blank=True 73 ) 74 sync_keytab = models.TextField( 75 help_text=_( 76 "Keytab to authenticate to kadmin for sync. " 77 "Must be base64-encoded or in the form TYPE:residual" 78 ), 79 blank=True, 80 ) 81 sync_ccache = models.TextField( 82 help_text=_( 83 "Credentials cache to authenticate to kadmin for sync. " 84 "Must be in the form TYPE:residual" 85 ), 86 blank=True, 87 ) 88 89 spnego_server_name = models.TextField( 90 help_text=_( 91 "Force the use of a specific server name for SPNEGO. Must be in the form HTTP@hostname" 92 ), 93 blank=True, 94 ) 95 spnego_keytab = models.TextField( 96 help_text=_("SPNEGO keytab base64-encoded or path to keytab in the form FILE:path"), 97 blank=True, 98 ) 99 spnego_ccache = models.TextField( 100 help_text=_("Credential cache to use for SPNEGO in form type:residual"), 101 blank=True, 102 ) 103 104 password_login_update_internal_password = models.BooleanField( 105 default=False, 106 help_text=_( 107 "If enabled, the authentik-stored password will be updated upon " 108 "login with the Kerberos password backend" 109 ), 110 ) 111 112 class Meta: 113 verbose_name = _("Kerberos Source") 114 verbose_name_plural = _("Kerberos Sources") 115 116 def __str__(self): 117 return f"Kerberos Source {self.name}" 118 119 @property 120 def component(self) -> str: 121 return "ak-source-kerberos-form" 122 123 @property 124 def serializer(self) -> type[Serializer]: 125 from authentik.sources.kerberos.api.source import KerberosSourceSerializer 126 127 return KerberosSourceSerializer 128 129 @property 130 def property_mapping_type(self) -> type[PropertyMapping]: 131 return KerberosSourcePropertyMapping 132 133 @property 134 def icon_url(self) -> str: 135 icon = super().icon_url 136 if not icon: 137 return static("authentik/sources/kerberos.png") 138 return icon 139 140 @property 141 def schedule_specs(self) -> list[ScheduleSpec]: 142 from authentik.sources.kerberos.tasks import kerberos_connectivity_check, kerberos_sync 143 144 return [ 145 ScheduleSpec( 146 actor=kerberos_sync, 147 uid=self.slug, 148 args=(self.pk,), 149 crontab=f"{fqdn_rand('kerberos_sync/' + str(self.pk))} */2 * * *", 150 send_on_save=True, 151 ), 152 ScheduleSpec( 153 actor=kerberos_connectivity_check, 154 uid=self.slug, 155 args=(self.pk,), 156 crontab=f"{fqdn_rand('kerberos_connectivity_check/' + str(self.pk))} * * * *", 157 send_on_save=True, 158 ), 159 ] 160 161 def ui_login_button(self, request: HttpRequest) -> UILoginButton: 162 return UILoginButton( 163 challenge=RedirectChallenge( 164 data={ 165 "to": reverse( 166 "authentik_sources_kerberos:spnego-login", 167 kwargs={"source_slug": self.slug}, 168 ), 169 } 170 ), 171 name=self.name, 172 icon_url=self.icon_url, 173 promoted=self.promoted, 174 ) 175 176 def ui_user_settings(self) -> UserSettingSerializer | None: 177 return UserSettingSerializer( 178 data={ 179 "title": self.name, 180 "component": "ak-user-settings-source-kerberos", 181 "configure_url": reverse( 182 "authentik_sources_kerberos:spnego-login", 183 kwargs={"source_slug": self.slug}, 184 ), 185 "icon_url": self.icon_url, 186 } 187 ) 188 189 @property 190 def sync_lock(self) -> pglock.advisory: 191 """Lock for syncing Kerberos to prevent multiple parallel syncs happening""" 192 return pglock.advisory( 193 lock_id=f"goauthentik.io/{connection.schema_name}/sources/kerberos/sync/{self.slug}", 194 timeout=0, 195 side_effect=pglock.Return, 196 ) 197 198 def get_base_user_properties(self, principal: str, **kwargs): 199 localpart, _ = principal.rsplit("@", 1) 200 201 properties = { 202 "username": localpart, 203 "type": UserTypes.INTERNAL, 204 "path": self.get_user_path(), 205 } 206 207 if "principal_obj" in kwargs: 208 princ_expiry = kwargs["principal_obj"].expire_time 209 properties["is_active"] = princ_expiry is None or princ_expiry > now() 210 211 return properties 212 213 def get_base_group_properties(self, group_id: str, **kwargs): 214 return { 215 "name": group_id, 216 } 217 218 @property 219 def tempdir(self) -> Path: 220 """Get temporary storage for Kerberos files""" 221 path = ( 222 Path(gettempdir()) 223 / "authentik" 224 / connection.schema_name 225 / "sources" 226 / "kerberos" 227 / str(self.pk) 228 ) 229 path.mkdir(mode=0o700, parents=True, exist_ok=True) 230 return path 231 232 @property 233 def krb5_conf_path(self) -> str | None: 234 """Get krb5.conf path""" 235 if not self.krb5_conf: 236 return None 237 conf_path = self.tempdir / "krb5.conf" 238 conf_path.write_text(self.krb5_conf) 239 return str(conf_path) 240 241 def _kadmin_init(self) -> KAdmin | None: 242 variant = KAdm5Variant.MitClient 243 api_version = KAdminApiVersion.Version2 244 match self.kadmin_type: 245 case KAdminType.MIT: 246 variant = KAdm5Variant.MitClient 247 api_version = KAdminApiVersion.Version4 248 case KAdminType.HEIMDAL: 249 variant = KAdm5Variant.HeimdalClient 250 api_version = KAdminApiVersion.Version2 251 # kadmin doesn't use a ccache for its connection 252 # as such, we don't need to create a separate ccache for each source 253 if not self.sync_principal: 254 return None 255 if self.sync_password: 256 return KAdmin.with_password( 257 variant, 258 self.sync_principal, 259 self.sync_password, 260 api_version=api_version, 261 ) 262 if self.sync_keytab: 263 keytab = self.sync_keytab 264 if ":" not in keytab: 265 keytab_path = self.tempdir / "kadmin_keytab" 266 keytab_path.touch(mode=0o600) 267 keytab_path.write_bytes(b64decode(self.sync_keytab)) 268 keytab = f"FILE:{keytab_path}" 269 return KAdmin.with_keytab( 270 variant, 271 self.sync_principal, 272 keytab, 273 api_version=api_version, 274 ) 275 if self.sync_ccache: 276 return KAdmin.with_ccache( 277 variant, 278 self.sync_principal, 279 self.sync_ccache, 280 api_version=api_version, 281 ) 282 return None 283 284 def connection(self) -> KAdmin | None: 285 """Get kadmin connection""" 286 if str(self.pk) not in _kadmin_connections: 287 kadm = self._kadmin_init() 288 if kadm is not None: 289 _kadmin_connections[str(self.pk)] = self._kadmin_init() 290 return _kadmin_connections.get(str(self.pk), None) 291 292 def check_connection(self) -> dict[str, str | bool]: 293 """Check Kerberos Connection""" 294 status: dict[str, str | bool] = {"status": "ok"} 295 if not self.sync_users: 296 return status 297 with Krb5ConfContext(self): 298 try: 299 kadm = self.connection() 300 if kadm is None: 301 status["status"] = "no connection" 302 return status 303 status["principal_exists"] = kadm.principal_exists(self.sync_principal) 304 except kadmin_exceptions.PyKAdminException as exc: 305 status["status"] = str(exc) 306 return status 307 308 def get_gssapi_store(self) -> dict[str, str]: 309 """Get GSSAPI credentials store for this source""" 310 ccache = self.spnego_ccache 311 keytab = None 312 313 if not ccache: 314 ccache_path = self.tempdir / "spnego_ccache" 315 ccache_path.touch(mode=0o600) 316 ccache = f"FILE:{ccache_path}" 317 318 if self.spnego_keytab: 319 # Keytab is of the form type:residual, use as-is 320 if ":" in self.spnego_keytab: 321 keytab = self.spnego_keytab 322 # Parse the keytab and write it in the file 323 else: 324 keytab_path = self.tempdir / "spnego_keytab" 325 keytab_path.touch(mode=0o600) 326 keytab_path.write_bytes(b64decode(self.spnego_keytab)) 327 keytab = f"FILE:{keytab_path}" 328 329 store = {"ccache": ccache} 330 if keytab is not None: 331 store["keytab"] = keytab 332 return store 333 334 def get_gssapi_creds(self) -> gssapi.creds.Credentials | None: 335 """Get GSSAPI credentials for this source""" 336 try: 337 name = None 338 if self.spnego_server_name: 339 # pylint: disable=c-extension-no-member 340 name = gssapi.names.Name( 341 base=self.spnego_server_name, 342 name_type=gssapi.raw.types.NameType.hostbased_service, 343 ) 344 return gssapi.creds.Credentials( 345 usage="accept", name=name, store=self.get_gssapi_store() 346 ) 347 except gssapi.exceptions.GSSError as exc: 348 LOGGER.warning("GSSAPI credentials failure", exc=exc) 349 return None
Federate Kerberos realm with authentik
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
123 @property 124 def serializer(self) -> type[Serializer]: 125 from authentik.sources.kerberos.api.source import KerberosSourceSerializer 126 127 return KerberosSourceSerializer
Get serializer for this model
129 @property 130 def property_mapping_type(self) -> type[PropertyMapping]: 131 return KerberosSourcePropertyMapping
Return property mapping type used by this object
133 @property 134 def icon_url(self) -> str: 135 icon = super().icon_url 136 if not icon: 137 return static("authentik/sources/kerberos.png") 138 return icon
Get the URL to the source icon
140 @property 141 def schedule_specs(self) -> list[ScheduleSpec]: 142 from authentik.sources.kerberos.tasks import kerberos_connectivity_check, kerberos_sync 143 144 return [ 145 ScheduleSpec( 146 actor=kerberos_sync, 147 uid=self.slug, 148 args=(self.pk,), 149 crontab=f"{fqdn_rand('kerberos_sync/' + str(self.pk))} */2 * * *", 150 send_on_save=True, 151 ), 152 ScheduleSpec( 153 actor=kerberos_connectivity_check, 154 uid=self.slug, 155 args=(self.pk,), 156 crontab=f"{fqdn_rand('kerberos_connectivity_check/' + str(self.pk))} * * * *", 157 send_on_save=True, 158 ), 159 ]
176 def ui_user_settings(self) -> UserSettingSerializer | None: 177 return UserSettingSerializer( 178 data={ 179 "title": self.name, 180 "component": "ak-user-settings-source-kerberos", 181 "configure_url": reverse( 182 "authentik_sources_kerberos:spnego-login", 183 kwargs={"source_slug": self.slug}, 184 ), 185 "icon_url": self.icon_url, 186 } 187 )
Entrypoint to integrate with User settings. Can either return None if no user settings are available, or UserSettingSerializer.
189 @property 190 def sync_lock(self) -> pglock.advisory: 191 """Lock for syncing Kerberos to prevent multiple parallel syncs happening""" 192 return pglock.advisory( 193 lock_id=f"goauthentik.io/{connection.schema_name}/sources/kerberos/sync/{self.slug}", 194 timeout=0, 195 side_effect=pglock.Return, 196 )
Lock for syncing Kerberos to prevent multiple parallel syncs happening
198 def get_base_user_properties(self, principal: str, **kwargs): 199 localpart, _ = principal.rsplit("@", 1) 200 201 properties = { 202 "username": localpart, 203 "type": UserTypes.INTERNAL, 204 "path": self.get_user_path(), 205 } 206 207 if "principal_obj" in kwargs: 208 princ_expiry = kwargs["principal_obj"].expire_time 209 properties["is_active"] = princ_expiry is None or princ_expiry > now() 210 211 return properties
Get base properties for a user to build final properties upon.
213 def get_base_group_properties(self, group_id: str, **kwargs): 214 return { 215 "name": group_id, 216 }
Get base properties for a group to build final properties upon.
218 @property 219 def tempdir(self) -> Path: 220 """Get temporary storage for Kerberos files""" 221 path = ( 222 Path(gettempdir()) 223 / "authentik" 224 / connection.schema_name 225 / "sources" 226 / "kerberos" 227 / str(self.pk) 228 ) 229 path.mkdir(mode=0o700, parents=True, exist_ok=True) 230 return path
Get temporary storage for Kerberos files
232 @property 233 def krb5_conf_path(self) -> str | None: 234 """Get krb5.conf path""" 235 if not self.krb5_conf: 236 return None 237 conf_path = self.tempdir / "krb5.conf" 238 conf_path.write_text(self.krb5_conf) 239 return str(conf_path)
Get krb5.conf path
284 def connection(self) -> KAdmin | None: 285 """Get kadmin connection""" 286 if str(self.pk) not in _kadmin_connections: 287 kadm = self._kadmin_init() 288 if kadm is not None: 289 _kadmin_connections[str(self.pk)] = self._kadmin_init() 290 return _kadmin_connections.get(str(self.pk), None)
Get kadmin connection
292 def check_connection(self) -> dict[str, str | bool]: 293 """Check Kerberos Connection""" 294 status: dict[str, str | bool] = {"status": "ok"} 295 if not self.sync_users: 296 return status 297 with Krb5ConfContext(self): 298 try: 299 kadm = self.connection() 300 if kadm is None: 301 status["status"] = "no connection" 302 return status 303 status["principal_exists"] = kadm.principal_exists(self.sync_principal) 304 except kadmin_exceptions.PyKAdminException as exc: 305 status["status"] = str(exc) 306 return status
Check Kerberos Connection
308 def get_gssapi_store(self) -> dict[str, str]: 309 """Get GSSAPI credentials store for this source""" 310 ccache = self.spnego_ccache 311 keytab = None 312 313 if not ccache: 314 ccache_path = self.tempdir / "spnego_ccache" 315 ccache_path.touch(mode=0o600) 316 ccache = f"FILE:{ccache_path}" 317 318 if self.spnego_keytab: 319 # Keytab is of the form type:residual, use as-is 320 if ":" in self.spnego_keytab: 321 keytab = self.spnego_keytab 322 # Parse the keytab and write it in the file 323 else: 324 keytab_path = self.tempdir / "spnego_keytab" 325 keytab_path.touch(mode=0o600) 326 keytab_path.write_bytes(b64decode(self.spnego_keytab)) 327 keytab = f"FILE:{keytab_path}" 328 329 store = {"ccache": ccache} 330 if keytab is not None: 331 store["keytab"] = keytab 332 return store
Get GSSAPI credentials store for this source
334 def get_gssapi_creds(self) -> gssapi.creds.Credentials | None: 335 """Get GSSAPI credentials for this source""" 336 try: 337 name = None 338 if self.spnego_server_name: 339 # pylint: disable=c-extension-no-member 340 name = gssapi.names.Name( 341 base=self.spnego_server_name, 342 name_type=gssapi.raw.types.NameType.hostbased_service, 343 ) 344 return gssapi.creds.Credentials( 345 usage="accept", name=name, store=self.get_gssapi_store() 346 ) 347 except gssapi.exceptions.GSSError as exc: 348 LOGGER.warning("GSSAPI credentials failure", exc=exc) 349 return None
Get GSSAPI credentials for this source
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.
Accessor to the related objects manager on the one-to-many relation created by GenericRelation.
In the example::
class Post(Model):
comments = GenericRelation(Comment)
post.comments is a ReverseGenericManyToOneDescriptor instance.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Inherited Members
- authentik.core.models.Source
- MANAGED_INBUILT
- name
- slug
- user_path_template
- enabled
- promoted
- user_property_mappings
- group_property_mappings
- icon
- authentication_flow
- enrollment_flow
- user_matching_mode
- group_matching_mode
- objects
- icon_themed_urls
- get_user_path
- managed
- authentication_flow_id
- enrollment_flow_id
- get_user_matching_mode_display
- get_group_matching_mode_display
- policybindingmodel_ptr_id
- policybindingmodel_ptr
- user_set
- usersourceconnection_set
- groupsourceconnection_set
- oauthsource
- samlsource
- kerberossource
- ldapsource
- identificationstage_set
- plexsource
- scimsource
- telegramsource
- sourcestage_set
The requested object does not exist
The query returned multiple objects when only one was expected.
352class Krb5ConfContext: 353 """ 354 Context manager to set the path to the krb5.conf config file. 355 """ 356 357 def __init__(self, source: KerberosSource): 358 self._source = source 359 self._path = self._source.krb5_conf_path 360 self._previous = None 361 362 def __enter__(self): 363 if not self._path: 364 return 365 self._previous = os.environ.get("KRB5_CONFIG", None) 366 os.environ["KRB5_CONFIG"] = self._path 367 368 def __exit__(self, *args, **kwargs): 369 if not self._path: 370 return 371 if self._previous: 372 os.environ["KRB5_CONFIG"] = self._previous 373 else: 374 del os.environ["KRB5_CONFIG"]
Context manager to set the path to the krb5.conf config file.
377class KerberosSourcePropertyMapping(PropertyMapping): 378 """Map Kerberos Property to User object attribute""" 379 380 @property 381 def component(self) -> str: 382 return "ak-property-mapping-source-kerberos-form" 383 384 @property 385 def serializer(self) -> type[Serializer]: 386 from authentik.sources.kerberos.api.property_mappings import ( 387 KerberosSourcePropertyMappingSerializer, 388 ) 389 390 return KerberosSourcePropertyMappingSerializer 391 392 def __str__(self): 393 return str(self.name) 394 395 class Meta: 396 verbose_name = _("Kerberos Source Property Mapping") 397 verbose_name_plural = _("Kerberos Source Property Mappings")
Map Kerberos Property to User object attribute
384 @property 385 def serializer(self) -> type[Serializer]: 386 from authentik.sources.kerberos.api.property_mappings import ( 387 KerberosSourcePropertyMappingSerializer, 388 ) 389 390 return KerberosSourcePropertyMappingSerializer
Get serializer for this model
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.core.models.PropertyMapping
- pm_uuid
- name
- expression
- objects
- evaluate
- managed
- provider_set
- source_userpropertymappings_set
- source_grouppropertymappings_set
- notificationwebhookmapping
- oauthsourcepropertymapping
- scopemapping
- endpoint_set
- racpropertymapping
- radiusproviderpropertymapping
- samlsourcepropertymapping
- samlpropertymapping
- scimprovider_set
- scimmapping
- kerberossourcepropertymapping
- ldapsourcepropertymapping
- plexsourcepropertymapping
- scimsourcepropertymapping
- telegramsourcepropertymapping
- googleworkspaceprovider_set
- googleworkspaceprovidermapping
- microsoftentraprovider_set
- microsoftentraprovidermapping
The requested object does not exist
The query returned multiple objects when only one was expected.
400class UserKerberosSourceConnection(UserSourceConnection): 401 """Connection to configured Kerberos Sources.""" 402 403 @property 404 def serializer(self) -> type[Serializer]: 405 from authentik.sources.kerberos.api.source_connection import ( 406 UserKerberosSourceConnectionSerializer, 407 ) 408 409 return UserKerberosSourceConnectionSerializer 410 411 class Meta: 412 verbose_name = _("User Kerberos Source Connection") 413 verbose_name_plural = _("User Kerberos Source Connections")
Connection to configured Kerberos Sources.
403 @property 404 def serializer(self) -> type[Serializer]: 405 from authentik.sources.kerberos.api.source_connection import ( 406 UserKerberosSourceConnectionSerializer, 407 ) 408 409 return UserKerberosSourceConnectionSerializer
Get serializer for this model
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.core.models.UserSourceConnection
- user
- source
- identifier
- objects
- created
- last_updated
- user_id
- source_id
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- id
- useroauthsourceconnection
- usersamlsourceconnection
- userkerberossourceconnection
- userldapsourceconnection
- userplexsourceconnection
- usertelegramsourceconnection
The requested object does not exist
The query returned multiple objects when only one was expected.
416class GroupKerberosSourceConnection(GroupSourceConnection): 417 """Connection to configured Kerberos Sources.""" 418 419 @property 420 def serializer(self) -> type[Serializer]: 421 from authentik.sources.kerberos.api.source_connection import ( 422 GroupKerberosSourceConnectionSerializer, 423 ) 424 425 return GroupKerberosSourceConnectionSerializer 426 427 class Meta: 428 verbose_name = _("Group Kerberos Source Connection") 429 verbose_name_plural = _("Group Kerberos Source Connections")
Connection to configured Kerberos Sources.
419 @property 420 def serializer(self) -> type[Serializer]: 421 from authentik.sources.kerberos.api.source_connection import ( 422 GroupKerberosSourceConnectionSerializer, 423 ) 424 425 return GroupKerberosSourceConnectionSerializer
Get serializer for this model
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.core.models.GroupSourceConnection
- group
- source
- identifier
- objects
- created
- last_updated
- group_id
- source_id
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- id
- groupoauthsourceconnection
- groupsamlsourceconnection
- groupkerberossourceconnection
- groupldapsourceconnection
- groupplexsourceconnection
- grouptelegramsourceconnection
The requested object does not exist
The query returned multiple objects when only one was expected.