authentik.stages.user_login.stage

Login stage logic

  1"""Login stage logic"""
  2
  3from datetime import datetime, timedelta
  4from hashlib import sha256
  5
  6from django.conf import settings
  7from django.contrib import messages
  8from django.contrib.auth import login
  9from django.http import HttpRequest, HttpResponse
 10from django.utils.translation import gettext as _
 11from jwt import PyJWTError, decode, encode
 12from rest_framework.fields import BooleanField, CharField
 13
 14from authentik.core.models import AuthenticatedSession, Session, User
 15from authentik.events.middleware import audit_ignore
 16from authentik.flows.challenge import ChallengeResponse, WithUserInfoChallenge
 17from authentik.flows.planner import PLAN_CONTEXT_PENDING_USER
 18from authentik.flows.stage import ChallengeStageView
 19from authentik.lib.utils.time import timedelta_from_string
 20from authentik.root.middleware import ClientIPMiddleware
 21from authentik.stages.password import BACKEND_INBUILT
 22from authentik.stages.password.stage import (
 23    PLAN_CONTEXT_AUTHENTICATION_BACKEND,
 24    PLAN_CONTEXT_METHOD_ARGS,
 25)
 26from authentik.stages.user_login.middleware import (
 27    SESSION_KEY_BINDING_GEO,
 28    SESSION_KEY_BINDING_NET,
 29)
 30from authentik.stages.user_login.models import UserLoginStage
 31from authentik.tenants.utils import get_unique_identifier
 32
 33COOKIE_NAME_KNOWN_DEVICE = "authentik_device"
 34
 35PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE = "known_device"
 36
 37
 38class UserLoginChallenge(WithUserInfoChallenge):
 39    """Empty challenge"""
 40
 41    component = CharField(default="ak-stage-user-login")
 42
 43
 44class UserLoginChallengeResponse(ChallengeResponse):
 45    """User login challenge"""
 46
 47    component = CharField(default="ak-stage-user-login")
 48
 49    remember_me = BooleanField(required=True)
 50
 51
 52class UserLoginStageView(ChallengeStageView):
 53    """Finalise Authentication flow by logging the user in"""
 54
 55    response_class = UserLoginChallengeResponse
 56
 57    def get_challenge(self, *args, **kwargs) -> UserLoginChallenge:
 58        return UserLoginChallenge(data={})
 59
 60    def dispatch(self, request: HttpRequest) -> HttpResponse:
 61        """Check for remember_me, and do login"""
 62        stage: UserLoginStage = self.executor.current_stage
 63        if timedelta_from_string(stage.remember_me_offset).total_seconds() > 0:
 64            return super().dispatch(request)
 65        return self.do_login(request)
 66
 67    def challenge_valid(self, response: UserLoginChallengeResponse) -> HttpResponse:
 68        return self.do_login(self.request, response.validated_data["remember_me"])
 69
 70    def set_session_duration(self, remember: bool) -> timedelta:
 71        """Update the sessions' expiry"""
 72        delta = timedelta_from_string(self.executor.current_stage.session_duration)
 73        if remember:
 74            offset = timedelta_from_string(self.executor.current_stage.remember_me_offset)
 75            delta = delta + offset
 76        if delta.total_seconds() == 0:
 77            self.request.session.set_expiry(0)
 78        else:
 79            self.request.session.set_expiry(delta)
 80        return delta
 81
 82    def set_session_ip(self):
 83        """Set the sessions' last IP and session bindings"""
 84        stage: UserLoginStage = self.executor.current_stage
 85
 86        self.request.session[self.request.session.model.Keys.LAST_IP] = (
 87            ClientIPMiddleware.get_client_ip(self.request)
 88        )
 89        self.request.session[SESSION_KEY_BINDING_NET] = stage.network_binding
 90        self.request.session[SESSION_KEY_BINDING_GEO] = stage.geoip_binding
 91
 92    # FIXME: identical function in authenticator_validate
 93    @property
 94    def cookie_jwt_key(self) -> str:
 95        """Signing key for Known-device Cookie for this stage"""
 96        return sha256(
 97            f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii")
 98        ).hexdigest()
 99
100    def set_known_device_cookie(self, user: User):
101        """Set a cookie, valid longer than the session, which denotes that this user
102        has logged in on this device before."""
103        delta = timedelta_from_string(self.executor.current_stage.remember_device)
104        response = self.executor.stage_ok()
105        if delta.total_seconds() < 1:
106            return response
107        expiry = datetime.now() + delta
108        cookie_payload = {
109            "sub": user.uid,
110            "exp": expiry.timestamp(),
111        }
112        cookie = encode(cookie_payload, self.cookie_jwt_key)
113        response.set_cookie(
114            COOKIE_NAME_KNOWN_DEVICE,
115            cookie,
116            expires=expiry,
117            path=settings.SESSION_COOKIE_PATH,
118            domain=settings.SESSION_COOKIE_DOMAIN,
119            samesite=settings.SESSION_COOKIE_SAMESITE,
120        )
121        return response
122
123    def is_known_device(self, user: User):
124        """Returns `True` if the login happened on a "known" device, by the same user."""
125        client_ip = ClientIPMiddleware.get_client_ip(self.request)
126        if AuthenticatedSession.objects.filter(session__last_ip=client_ip, user=user).exists():
127            return True
128        if COOKIE_NAME_KNOWN_DEVICE not in self.request.COOKIES:
129            return False
130        try:
131            payload = decode(
132                self.request.COOKIES[COOKIE_NAME_KNOWN_DEVICE], self.cookie_jwt_key, ["HS256"]
133            )
134            if payload["sub"] == user.uid:
135                return True
136            return False
137        except (PyJWTError, ValueError, TypeError) as exc:
138            self.logger.info("eh", exc=exc)
139            return False
140
141    def do_login(self, request: HttpRequest, remember: bool | None = None) -> HttpResponse:
142        """Attach the currently pending user to the current session.
143        `remember` Argument should be `None` if not configured, otherwise set to `True`/`False`
144        representative of the user's choice."""
145        if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context:
146            message = _("No Pending user to login.")
147            messages.error(request, message)
148            self.logger.warning(message)
149            return self.executor.stage_invalid()
150        backend = self.executor.plan.context.get(
151            PLAN_CONTEXT_AUTHENTICATION_BACKEND, BACKEND_INBUILT
152        )
153        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
154        if not user.is_active:
155            self.logger.warning("User is not active, login will not work.")
156            return self.executor.stage_invalid()
157        delta = self.set_session_duration(bool(remember))
158        self.set_session_ip()
159        # Check if the login request is coming from a known device
160        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
161        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault(
162            PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE, self.is_known_device(user)
163        )
164        # the `user_logged_in` signal will update the user to write the `last_login` field
165        # which we don't want to log as we already have a dedicated login event
166        with audit_ignore():
167            login(
168                self.request,
169                user,
170                backend=backend,
171            )
172        self.logger.debug(
173            "Logged in",
174            backend=backend,
175            user=user.username,
176            flow_slug=self.executor.flow.slug,
177            session_duration=delta,
178        )
179        if self.executor.current_stage.terminate_other_sessions:
180            Session.objects.filter(
181                authenticatedsession__user=user,
182            ).exclude(session_key=self.request.session.session_key).delete()
183        if remember is None:
184            return self.set_known_device_cookie(user)
185        return self.executor.stage_ok()
PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE = 'known_device'
class UserLoginChallenge(authentik.flows.challenge.WithUserInfoChallenge):
39class UserLoginChallenge(WithUserInfoChallenge):
40    """Empty challenge"""
41
42    component = CharField(default="ak-stage-user-login")

Empty challenge

component
class UserLoginChallengeResponse(authentik.flows.challenge.ChallengeResponse):
45class UserLoginChallengeResponse(ChallengeResponse):
46    """User login challenge"""
47
48    component = CharField(default="ak-stage-user-login")
49
50    remember_me = BooleanField(required=True)

User login challenge

component
remember_me
class UserLoginStageView(authentik.flows.stage.ChallengeStageView):
 53class UserLoginStageView(ChallengeStageView):
 54    """Finalise Authentication flow by logging the user in"""
 55
 56    response_class = UserLoginChallengeResponse
 57
 58    def get_challenge(self, *args, **kwargs) -> UserLoginChallenge:
 59        return UserLoginChallenge(data={})
 60
 61    def dispatch(self, request: HttpRequest) -> HttpResponse:
 62        """Check for remember_me, and do login"""
 63        stage: UserLoginStage = self.executor.current_stage
 64        if timedelta_from_string(stage.remember_me_offset).total_seconds() > 0:
 65            return super().dispatch(request)
 66        return self.do_login(request)
 67
 68    def challenge_valid(self, response: UserLoginChallengeResponse) -> HttpResponse:
 69        return self.do_login(self.request, response.validated_data["remember_me"])
 70
 71    def set_session_duration(self, remember: bool) -> timedelta:
 72        """Update the sessions' expiry"""
 73        delta = timedelta_from_string(self.executor.current_stage.session_duration)
 74        if remember:
 75            offset = timedelta_from_string(self.executor.current_stage.remember_me_offset)
 76            delta = delta + offset
 77        if delta.total_seconds() == 0:
 78            self.request.session.set_expiry(0)
 79        else:
 80            self.request.session.set_expiry(delta)
 81        return delta
 82
 83    def set_session_ip(self):
 84        """Set the sessions' last IP and session bindings"""
 85        stage: UserLoginStage = self.executor.current_stage
 86
 87        self.request.session[self.request.session.model.Keys.LAST_IP] = (
 88            ClientIPMiddleware.get_client_ip(self.request)
 89        )
 90        self.request.session[SESSION_KEY_BINDING_NET] = stage.network_binding
 91        self.request.session[SESSION_KEY_BINDING_GEO] = stage.geoip_binding
 92
 93    # FIXME: identical function in authenticator_validate
 94    @property
 95    def cookie_jwt_key(self) -> str:
 96        """Signing key for Known-device Cookie for this stage"""
 97        return sha256(
 98            f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii")
 99        ).hexdigest()
100
101    def set_known_device_cookie(self, user: User):
102        """Set a cookie, valid longer than the session, which denotes that this user
103        has logged in on this device before."""
104        delta = timedelta_from_string(self.executor.current_stage.remember_device)
105        response = self.executor.stage_ok()
106        if delta.total_seconds() < 1:
107            return response
108        expiry = datetime.now() + delta
109        cookie_payload = {
110            "sub": user.uid,
111            "exp": expiry.timestamp(),
112        }
113        cookie = encode(cookie_payload, self.cookie_jwt_key)
114        response.set_cookie(
115            COOKIE_NAME_KNOWN_DEVICE,
116            cookie,
117            expires=expiry,
118            path=settings.SESSION_COOKIE_PATH,
119            domain=settings.SESSION_COOKIE_DOMAIN,
120            samesite=settings.SESSION_COOKIE_SAMESITE,
121        )
122        return response
123
124    def is_known_device(self, user: User):
125        """Returns `True` if the login happened on a "known" device, by the same user."""
126        client_ip = ClientIPMiddleware.get_client_ip(self.request)
127        if AuthenticatedSession.objects.filter(session__last_ip=client_ip, user=user).exists():
128            return True
129        if COOKIE_NAME_KNOWN_DEVICE not in self.request.COOKIES:
130            return False
131        try:
132            payload = decode(
133                self.request.COOKIES[COOKIE_NAME_KNOWN_DEVICE], self.cookie_jwt_key, ["HS256"]
134            )
135            if payload["sub"] == user.uid:
136                return True
137            return False
138        except (PyJWTError, ValueError, TypeError) as exc:
139            self.logger.info("eh", exc=exc)
140            return False
141
142    def do_login(self, request: HttpRequest, remember: bool | None = None) -> HttpResponse:
143        """Attach the currently pending user to the current session.
144        `remember` Argument should be `None` if not configured, otherwise set to `True`/`False`
145        representative of the user's choice."""
146        if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context:
147            message = _("No Pending user to login.")
148            messages.error(request, message)
149            self.logger.warning(message)
150            return self.executor.stage_invalid()
151        backend = self.executor.plan.context.get(
152            PLAN_CONTEXT_AUTHENTICATION_BACKEND, BACKEND_INBUILT
153        )
154        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
155        if not user.is_active:
156            self.logger.warning("User is not active, login will not work.")
157            return self.executor.stage_invalid()
158        delta = self.set_session_duration(bool(remember))
159        self.set_session_ip()
160        # Check if the login request is coming from a known device
161        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
162        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault(
163            PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE, self.is_known_device(user)
164        )
165        # the `user_logged_in` signal will update the user to write the `last_login` field
166        # which we don't want to log as we already have a dedicated login event
167        with audit_ignore():
168            login(
169                self.request,
170                user,
171                backend=backend,
172            )
173        self.logger.debug(
174            "Logged in",
175            backend=backend,
176            user=user.username,
177            flow_slug=self.executor.flow.slug,
178            session_duration=delta,
179        )
180        if self.executor.current_stage.terminate_other_sessions:
181            Session.objects.filter(
182                authenticatedsession__user=user,
183            ).exclude(session_key=self.request.session.session_key).delete()
184        if remember is None:
185            return self.set_known_device_cookie(user)
186        return self.executor.stage_ok()

Finalise Authentication flow by logging the user in

response_class = <class 'UserLoginChallengeResponse'>
def get_challenge( self, *args, **kwargs) -> UserLoginChallenge:
58    def get_challenge(self, *args, **kwargs) -> UserLoginChallenge:
59        return UserLoginChallenge(data={})

Return the challenge that the client should solve

def dispatch( self, request: django.http.request.HttpRequest) -> django.http.response.HttpResponse:
61    def dispatch(self, request: HttpRequest) -> HttpResponse:
62        """Check for remember_me, and do login"""
63        stage: UserLoginStage = self.executor.current_stage
64        if timedelta_from_string(stage.remember_me_offset).total_seconds() > 0:
65            return super().dispatch(request)
66        return self.do_login(request)

Check for remember_me, and do login

def challenge_valid( self, response: UserLoginChallengeResponse) -> django.http.response.HttpResponse:
68    def challenge_valid(self, response: UserLoginChallengeResponse) -> HttpResponse:
69        return self.do_login(self.request, response.validated_data["remember_me"])

Callback when the challenge has the correct format

def set_session_duration(self, remember: bool) -> datetime.timedelta:
71    def set_session_duration(self, remember: bool) -> timedelta:
72        """Update the sessions' expiry"""
73        delta = timedelta_from_string(self.executor.current_stage.session_duration)
74        if remember:
75            offset = timedelta_from_string(self.executor.current_stage.remember_me_offset)
76            delta = delta + offset
77        if delta.total_seconds() == 0:
78            self.request.session.set_expiry(0)
79        else:
80            self.request.session.set_expiry(delta)
81        return delta

Update the sessions' expiry

def set_session_ip(self):
83    def set_session_ip(self):
84        """Set the sessions' last IP and session bindings"""
85        stage: UserLoginStage = self.executor.current_stage
86
87        self.request.session[self.request.session.model.Keys.LAST_IP] = (
88            ClientIPMiddleware.get_client_ip(self.request)
89        )
90        self.request.session[SESSION_KEY_BINDING_NET] = stage.network_binding
91        self.request.session[SESSION_KEY_BINDING_GEO] = stage.geoip_binding

Set the sessions' last IP and session bindings

cookie_jwt_key: str
94    @property
95    def cookie_jwt_key(self) -> str:
96        """Signing key for Known-device Cookie for this stage"""
97        return sha256(
98            f"{get_unique_identifier()}:{self.executor.current_stage.pk.hex}".encode("ascii")
99        ).hexdigest()

Signing key for Known-device Cookie for this stage

def is_known_device(self, user: authentik.core.models.User):
124    def is_known_device(self, user: User):
125        """Returns `True` if the login happened on a "known" device, by the same user."""
126        client_ip = ClientIPMiddleware.get_client_ip(self.request)
127        if AuthenticatedSession.objects.filter(session__last_ip=client_ip, user=user).exists():
128            return True
129        if COOKIE_NAME_KNOWN_DEVICE not in self.request.COOKIES:
130            return False
131        try:
132            payload = decode(
133                self.request.COOKIES[COOKIE_NAME_KNOWN_DEVICE], self.cookie_jwt_key, ["HS256"]
134            )
135            if payload["sub"] == user.uid:
136                return True
137            return False
138        except (PyJWTError, ValueError, TypeError) as exc:
139            self.logger.info("eh", exc=exc)
140            return False

Returns True if the login happened on a "known" device, by the same user.

def do_login( self, request: django.http.request.HttpRequest, remember: bool | None = None) -> django.http.response.HttpResponse:
142    def do_login(self, request: HttpRequest, remember: bool | None = None) -> HttpResponse:
143        """Attach the currently pending user to the current session.
144        `remember` Argument should be `None` if not configured, otherwise set to `True`/`False`
145        representative of the user's choice."""
146        if PLAN_CONTEXT_PENDING_USER not in self.executor.plan.context:
147            message = _("No Pending user to login.")
148            messages.error(request, message)
149            self.logger.warning(message)
150            return self.executor.stage_invalid()
151        backend = self.executor.plan.context.get(
152            PLAN_CONTEXT_AUTHENTICATION_BACKEND, BACKEND_INBUILT
153        )
154        user: User = self.executor.plan.context[PLAN_CONTEXT_PENDING_USER]
155        if not user.is_active:
156            self.logger.warning("User is not active, login will not work.")
157            return self.executor.stage_invalid()
158        delta = self.set_session_duration(bool(remember))
159        self.set_session_ip()
160        # Check if the login request is coming from a known device
161        self.executor.plan.context.setdefault(PLAN_CONTEXT_METHOD_ARGS, {})
162        self.executor.plan.context[PLAN_CONTEXT_METHOD_ARGS].setdefault(
163            PLAN_CONTEXT_METHOD_ARGS_KNOWN_DEVICE, self.is_known_device(user)
164        )
165        # the `user_logged_in` signal will update the user to write the `last_login` field
166        # which we don't want to log as we already have a dedicated login event
167        with audit_ignore():
168            login(
169                self.request,
170                user,
171                backend=backend,
172            )
173        self.logger.debug(
174            "Logged in",
175            backend=backend,
176            user=user.username,
177            flow_slug=self.executor.flow.slug,
178            session_duration=delta,
179        )
180        if self.executor.current_stage.terminate_other_sessions:
181            Session.objects.filter(
182                authenticatedsession__user=user,
183            ).exclude(session_key=self.request.session.session_key).delete()
184        if remember is None:
185            return self.set_known_device_cookie(user)
186        return self.executor.stage_ok()

Attach the currently pending user to the current session. remember Argument should be None if not configured, otherwise set to True/False representative of the user's choice.