authentik.core.auth

Authenticate with tokens

 1"""Authenticate with tokens"""
 2
 3from typing import Any
 4
 5from django.contrib.auth.backends import ModelBackend
 6from django.http.request import HttpRequest
 7
 8from authentik.core.models import Token, TokenIntents, User
 9from authentik.events.utils import cleanse_dict, sanitize_dict
10from authentik.flows.planner import FlowPlan
11from authentik.flows.views.executor import SESSION_KEY_PLAN
12from authentik.stages.password.stage import PLAN_CONTEXT_METHOD, PLAN_CONTEXT_METHOD_ARGS
13
14
15class ModelBackendNoAuthz(ModelBackend):
16    def get_user_permissions(self, user_obj, obj=None):
17        return set()
18
19    def get_group_permissions(self, user_obj, obj=None):
20        return set()
21
22    def get_all_permissions(self, user_obj, obj=None):
23        return set()
24
25    def has_perm(self, user_obj, perm, obj=None):
26        return False
27
28    def has_module_perms(self, user_obj, app_label):
29        return False
30
31    def with_perm(self, perm, is_active=True, include_superusers=True, obj=None):
32        return User.objects.none()
33
34
35class InbuiltBackend(ModelBackendNoAuthz):
36    """Inbuilt backend"""
37
38    def authenticate(
39        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
40    ) -> User | None:
41        user = super().authenticate(request, username=username, password=password, **kwargs)
42        if not user:
43            return None
44        self.set_method("password", request)
45        return user
46
47    async def aauthenticate(
48        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
49    ) -> User | None:
50        user = await super().aauthenticate(request, username=username, password=password, **kwargs)
51        if not user:
52            return None
53        self.set_method("password", request)
54        return user
55
56    def set_method(self, method: str, request: HttpRequest | None, **kwargs):
57        """Set method data on current flow, if possbiel"""
58        if not request:
59            return
60        # Since we can't directly pass other variables to signals, and we want to log the method
61        # and the token used, we assume we're running in a flow and set a variable in the context
62        flow_plan: FlowPlan = request.session.get(SESSION_KEY_PLAN, FlowPlan(""))
63        flow_plan.context.setdefault(PLAN_CONTEXT_METHOD, method)
64        flow_plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
65        flow_plan.context[PLAN_CONTEXT_METHOD_ARGS].update(cleanse_dict(sanitize_dict(kwargs)))
66        request.session[SESSION_KEY_PLAN] = flow_plan
67
68
69class TokenBackend(InbuiltBackend):
70    """Authenticate with token"""
71
72    def authenticate(
73        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
74    ) -> User | None:
75        try:
76            user = User._default_manager.get_by_natural_key(username)
77
78        except User.DoesNotExist:
79            # Run the default password hasher once to reduce the timing
80            # difference between an existing and a nonexistent user (#20760).
81            User().set_password(password, request=request)
82            return None
83
84        tokens = Token.objects.filter(
85            user=user, key=password, intent=TokenIntents.INTENT_APP_PASSWORD
86        )
87        if not tokens.exists():
88            return None
89        token = tokens.first()
90        self.set_method("token", request, token=token)
91        return token.user
class ModelBackendNoAuthz(django.contrib.auth.backends.ModelBackend):
16class ModelBackendNoAuthz(ModelBackend):
17    def get_user_permissions(self, user_obj, obj=None):
18        return set()
19
20    def get_group_permissions(self, user_obj, obj=None):
21        return set()
22
23    def get_all_permissions(self, user_obj, obj=None):
24        return set()
25
26    def has_perm(self, user_obj, perm, obj=None):
27        return False
28
29    def has_module_perms(self, user_obj, app_label):
30        return False
31
32    def with_perm(self, perm, is_active=True, include_superusers=True, obj=None):
33        return User.objects.none()

Authenticates against settings.AUTH_USER_MODEL.

def get_user_permissions(self, user_obj, obj=None):
17    def get_user_permissions(self, user_obj, obj=None):
18        return set()

Return a set of permission strings the user user_obj has from their user_permissions.

def get_group_permissions(self, user_obj, obj=None):
20    def get_group_permissions(self, user_obj, obj=None):
21        return set()

Return a set of permission strings the user user_obj has from the groups they belong.

def get_all_permissions(self, user_obj, obj=None):
23    def get_all_permissions(self, user_obj, obj=None):
24        return set()
def has_perm(self, user_obj, perm, obj=None):
26    def has_perm(self, user_obj, perm, obj=None):
27        return False
def has_module_perms(self, user_obj, app_label):
29    def has_module_perms(self, user_obj, app_label):
30        return False

Return True if user_obj has any permissions in the given app_label.

def with_perm(self, perm, is_active=True, include_superusers=True, obj=None):
32    def with_perm(self, perm, is_active=True, include_superusers=True, obj=None):
33        return User.objects.none()

Return users that have permission "perm". By default, filter out inactive users and include superusers.

class InbuiltBackend(ModelBackendNoAuthz):
36class InbuiltBackend(ModelBackendNoAuthz):
37    """Inbuilt backend"""
38
39    def authenticate(
40        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
41    ) -> User | None:
42        user = super().authenticate(request, username=username, password=password, **kwargs)
43        if not user:
44            return None
45        self.set_method("password", request)
46        return user
47
48    async def aauthenticate(
49        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
50    ) -> User | None:
51        user = await super().aauthenticate(request, username=username, password=password, **kwargs)
52        if not user:
53            return None
54        self.set_method("password", request)
55        return user
56
57    def set_method(self, method: str, request: HttpRequest | None, **kwargs):
58        """Set method data on current flow, if possbiel"""
59        if not request:
60            return
61        # Since we can't directly pass other variables to signals, and we want to log the method
62        # and the token used, we assume we're running in a flow and set a variable in the context
63        flow_plan: FlowPlan = request.session.get(SESSION_KEY_PLAN, FlowPlan(""))
64        flow_plan.context.setdefault(PLAN_CONTEXT_METHOD, method)
65        flow_plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
66        flow_plan.context[PLAN_CONTEXT_METHOD_ARGS].update(cleanse_dict(sanitize_dict(kwargs)))
67        request.session[SESSION_KEY_PLAN] = flow_plan

Inbuilt backend

def authenticate( self, request: django.http.request.HttpRequest, username: str | None, password: str | None, **kwargs: Any) -> authentik.core.models.User | None:
39    def authenticate(
40        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
41    ) -> User | None:
42        user = super().authenticate(request, username=username, password=password, **kwargs)
43        if not user:
44            return None
45        self.set_method("password", request)
46        return user
async def aauthenticate( self, request: django.http.request.HttpRequest, username: str | None, password: str | None, **kwargs: Any) -> authentik.core.models.User | None:
48    async def aauthenticate(
49        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
50    ) -> User | None:
51        user = await super().aauthenticate(request, username=username, password=password, **kwargs)
52        if not user:
53            return None
54        self.set_method("password", request)
55        return user
def set_method( self, method: str, request: django.http.request.HttpRequest | None, **kwargs):
57    def set_method(self, method: str, request: HttpRequest | None, **kwargs):
58        """Set method data on current flow, if possbiel"""
59        if not request:
60            return
61        # Since we can't directly pass other variables to signals, and we want to log the method
62        # and the token used, we assume we're running in a flow and set a variable in the context
63        flow_plan: FlowPlan = request.session.get(SESSION_KEY_PLAN, FlowPlan(""))
64        flow_plan.context.setdefault(PLAN_CONTEXT_METHOD, method)
65        flow_plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
66        flow_plan.context[PLAN_CONTEXT_METHOD_ARGS].update(cleanse_dict(sanitize_dict(kwargs)))
67        request.session[SESSION_KEY_PLAN] = flow_plan

Set method data on current flow, if possbiel

class TokenBackend(InbuiltBackend):
70class TokenBackend(InbuiltBackend):
71    """Authenticate with token"""
72
73    def authenticate(
74        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
75    ) -> User | None:
76        try:
77            user = User._default_manager.get_by_natural_key(username)
78
79        except User.DoesNotExist:
80            # Run the default password hasher once to reduce the timing
81            # difference between an existing and a nonexistent user (#20760).
82            User().set_password(password, request=request)
83            return None
84
85        tokens = Token.objects.filter(
86            user=user, key=password, intent=TokenIntents.INTENT_APP_PASSWORD
87        )
88        if not tokens.exists():
89            return None
90        token = tokens.first()
91        self.set_method("token", request, token=token)
92        return token.user

Authenticate with token

def authenticate( self, request: django.http.request.HttpRequest, username: str | None, password: str | None, **kwargs: Any) -> authentik.core.models.User | None:
73    def authenticate(
74        self, request: HttpRequest, username: str | None, password: str | None, **kwargs: Any
75    ) -> User | None:
76        try:
77            user = User._default_manager.get_by_natural_key(username)
78
79        except User.DoesNotExist:
80            # Run the default password hasher once to reduce the timing
81            # difference between an existing and a nonexistent user (#20760).
82            User().set_password(password, request=request)
83            return None
84
85        tokens = Token.objects.filter(
86            user=user, key=password, intent=TokenIntents.INTENT_APP_PASSWORD
87        )
88        if not tokens.exists():
89            return None
90        token = tokens.first()
91        self.set_method("token", request, token=token)
92        return token.user