authentik.core.api.tokens

Tokens API Viewset

  1"""Tokens API Viewset"""
  2
  3from typing import Any
  4
  5from django.utils.timezone import now
  6from drf_spectacular.utils import OpenApiResponse, extend_schema
  7from rest_framework.decorators import action
  8from rest_framework.exceptions import ValidationError
  9from rest_framework.fields import CharField
 10from rest_framework.request import Request
 11from rest_framework.response import Response
 12from rest_framework.viewsets import ModelViewSet
 13
 14from authentik.api.validation import validate
 15from authentik.blueprints.api import ManagedSerializer
 16from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
 17from authentik.core.api.used_by import UsedByMixin
 18from authentik.core.api.users import UserSerializer
 19from authentik.core.api.utils import ModelSerializer, PassiveSerializer
 20from authentik.core.models import (
 21    USER_ATTRIBUTE_TOKEN_EXPIRING,
 22    USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME,
 23    Token,
 24    TokenIntents,
 25    User,
 26    default_token_duration,
 27)
 28from authentik.events.models import Event, EventAction
 29from authentik.events.utils import model_to_dict
 30from authentik.lib.utils.time import timedelta_from_string
 31from authentik.rbac.decorators import permission_required
 32
 33
 34class TokenSerializer(ManagedSerializer, ModelSerializer):
 35    """Token Serializer"""
 36
 37    user_obj = UserSerializer(required=False, source="user", read_only=True)
 38
 39    def __init__(self, *args, **kwargs) -> None:
 40        super().__init__(*args, **kwargs)
 41        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
 42            self.fields["key"] = CharField(required=False)
 43
 44    def validate_user(self, user: User):
 45        """Ensure user of token cannot be changed"""
 46        if self.instance and self.instance.user_id:
 47            if user.pk != self.instance.user_id:
 48                raise ValidationError("User cannot be changed")
 49        return user
 50
 51    def validate(self, attrs: dict[Any, str]) -> dict[Any, str]:
 52        """Ensure only API or App password tokens are created."""
 53        request: Request = self.context.get("request")
 54        if not request:
 55            if "user" not in attrs:
 56                raise ValidationError("Missing user")
 57            if "intent" not in attrs:
 58                raise ValidationError("Missing intent")
 59        else:
 60            attrs.setdefault("user", request.user)
 61        attrs.setdefault("intent", TokenIntents.INTENT_API)
 62        if attrs.get("intent") not in [TokenIntents.INTENT_API, TokenIntents.INTENT_APP_PASSWORD]:
 63            raise ValidationError({"intent": f"Invalid intent {attrs.get('intent')}"})
 64
 65        if attrs.get("intent") == TokenIntents.INTENT_APP_PASSWORD:
 66            # user IS in attrs
 67            user: User = attrs.get("user")
 68            max_token_lifetime = user.group_attributes(request).get(
 69                USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME,
 70            )
 71            max_token_lifetime_dt = default_token_duration()
 72            if max_token_lifetime is not None:
 73                try:
 74                    max_token_lifetime_dt = now() + timedelta_from_string(max_token_lifetime)
 75                except ValueError:
 76                    pass
 77
 78            expires = attrs.get("expires")
 79            if expires is not None and expires > max_token_lifetime_dt:
 80                raise ValidationError(
 81                    {
 82                        "expires": (
 83                            f"Token expires exceeds maximum lifetime ({max_token_lifetime_dt} UTC)."
 84                        )
 85                    }
 86                )
 87        elif attrs.get("intent") == TokenIntents.INTENT_API:
 88            # For API tokens, expires cannot be overridden
 89            attrs["expires"] = default_token_duration()
 90
 91        return attrs
 92
 93    class Meta:
 94        model = Token
 95        fields = [
 96            "pk",
 97            "managed",
 98            "identifier",
 99            "intent",
100            "user",
101            "user_obj",
102            "description",
103            "expires",
104            "expiring",
105        ]
106        extra_kwargs = {
107            "user": {"required": False},
108        }
109
110
111class TokenSetKeySerializer(PassiveSerializer):
112    """Set token's key"""
113
114    key = CharField()
115
116
117class TokenViewSerializer(PassiveSerializer):
118    """Show token's current key"""
119
120    key = CharField(read_only=True)
121
122
123class TokenViewSet(UsedByMixin, ModelViewSet):
124    """Token Viewset"""
125
126    lookup_field = "identifier"
127    queryset = Token.objects.including_expired().all()
128    serializer_class = TokenSerializer
129    search_fields = [
130        "identifier",
131        "intent",
132        "user__username",
133        "description",
134    ]
135    filterset_fields = [
136        "identifier",
137        "intent",
138        "user__username",
139        "description",
140        "expires",
141        "expiring",
142        "managed",
143    ]
144    ordering = ["identifier", "expires"]
145    owner_field = "user"
146    rbac_allow_create_without_perm = True
147
148    def perform_create(self, serializer: TokenSerializer):
149        if not self.request.user.is_superuser:
150            instance = serializer.save(
151                user=self.request.user,
152                expiring=self.request.user.attributes.get(USER_ATTRIBUTE_TOKEN_EXPIRING, True),
153            )
154            self.request.user.assign_perms_to_managed_role(
155                "authentik_core.view_token_key", instance
156            )
157            return instance
158        return super().perform_create(serializer)
159
160    @permission_required("authentik_core.view_token_key")
161    @extend_schema(
162        responses={
163            200: TokenViewSerializer(many=False),
164            404: OpenApiResponse(description="Token not found or expired"),
165        }
166    )
167    @action(detail=True, pagination_class=None, filter_backends=[], methods=["GET"])
168    def view_key(self, request: Request, identifier: str) -> Response:
169        """Return token key and log access"""
170        token: Token = self.get_object()
171        Event.new(EventAction.SECRET_VIEW, secret=token).from_http(request)  # noqa # nosec
172        return Response(TokenViewSerializer({"key": token.key}).data)
173
174    @permission_required("authentik_core.set_token_key")
175    @extend_schema(
176        request=TokenSetKeySerializer(),
177        responses={
178            204: OpenApiResponse(description="Successfully changed key"),
179            400: OpenApiResponse(description="Missing key"),
180            404: OpenApiResponse(description="Token not found or expired"),
181        },
182    )
183    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
184    @validate(TokenSetKeySerializer)
185    def set_key(self, request: Request, identifier: str, body: TokenSetKeySerializer) -> Response:
186        """Set token key. Action is logged as event. `authentik_core.set_token_key` permission
187        is required."""
188        token: Token = self.get_object()
189        key = body.validated_data.get("key")
190        if not key:
191            return Response(status=400)
192        token.key = key
193        token.save()
194        Event.new(EventAction.MODEL_UPDATED, model=model_to_dict(token)).from_http(
195            request
196        )  # noqa # nosec
197        return Response(status=204)
 35class TokenSerializer(ManagedSerializer, ModelSerializer):
 36    """Token Serializer"""
 37
 38    user_obj = UserSerializer(required=False, source="user", read_only=True)
 39
 40    def __init__(self, *args, **kwargs) -> None:
 41        super().__init__(*args, **kwargs)
 42        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
 43            self.fields["key"] = CharField(required=False)
 44
 45    def validate_user(self, user: User):
 46        """Ensure user of token cannot be changed"""
 47        if self.instance and self.instance.user_id:
 48            if user.pk != self.instance.user_id:
 49                raise ValidationError("User cannot be changed")
 50        return user
 51
 52    def validate(self, attrs: dict[Any, str]) -> dict[Any, str]:
 53        """Ensure only API or App password tokens are created."""
 54        request: Request = self.context.get("request")
 55        if not request:
 56            if "user" not in attrs:
 57                raise ValidationError("Missing user")
 58            if "intent" not in attrs:
 59                raise ValidationError("Missing intent")
 60        else:
 61            attrs.setdefault("user", request.user)
 62        attrs.setdefault("intent", TokenIntents.INTENT_API)
 63        if attrs.get("intent") not in [TokenIntents.INTENT_API, TokenIntents.INTENT_APP_PASSWORD]:
 64            raise ValidationError({"intent": f"Invalid intent {attrs.get('intent')}"})
 65
 66        if attrs.get("intent") == TokenIntents.INTENT_APP_PASSWORD:
 67            # user IS in attrs
 68            user: User = attrs.get("user")
 69            max_token_lifetime = user.group_attributes(request).get(
 70                USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME,
 71            )
 72            max_token_lifetime_dt = default_token_duration()
 73            if max_token_lifetime is not None:
 74                try:
 75                    max_token_lifetime_dt = now() + timedelta_from_string(max_token_lifetime)
 76                except ValueError:
 77                    pass
 78
 79            expires = attrs.get("expires")
 80            if expires is not None and expires > max_token_lifetime_dt:
 81                raise ValidationError(
 82                    {
 83                        "expires": (
 84                            f"Token expires exceeds maximum lifetime ({max_token_lifetime_dt} UTC)."
 85                        )
 86                    }
 87                )
 88        elif attrs.get("intent") == TokenIntents.INTENT_API:
 89            # For API tokens, expires cannot be overridden
 90            attrs["expires"] = default_token_duration()
 91
 92        return attrs
 93
 94    class Meta:
 95        model = Token
 96        fields = [
 97            "pk",
 98            "managed",
 99            "identifier",
100            "intent",
101            "user",
102            "user_obj",
103            "description",
104            "expires",
105            "expiring",
106        ]
107        extra_kwargs = {
108            "user": {"required": False},
109        }

Token Serializer

TokenSerializer(*args, **kwargs)
40    def __init__(self, *args, **kwargs) -> None:
41        super().__init__(*args, **kwargs)
42        if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
43            self.fields["key"] = CharField(required=False)
user_obj
def validate_user(self, user: authentik.core.models.User):
45    def validate_user(self, user: User):
46        """Ensure user of token cannot be changed"""
47        if self.instance and self.instance.user_id:
48            if user.pk != self.instance.user_id:
49                raise ValidationError("User cannot be changed")
50        return user

Ensure user of token cannot be changed

def validate(self, attrs: dict[typing.Any, str]) -> dict[typing.Any, str]:
52    def validate(self, attrs: dict[Any, str]) -> dict[Any, str]:
53        """Ensure only API or App password tokens are created."""
54        request: Request = self.context.get("request")
55        if not request:
56            if "user" not in attrs:
57                raise ValidationError("Missing user")
58            if "intent" not in attrs:
59                raise ValidationError("Missing intent")
60        else:
61            attrs.setdefault("user", request.user)
62        attrs.setdefault("intent", TokenIntents.INTENT_API)
63        if attrs.get("intent") not in [TokenIntents.INTENT_API, TokenIntents.INTENT_APP_PASSWORD]:
64            raise ValidationError({"intent": f"Invalid intent {attrs.get('intent')}"})
65
66        if attrs.get("intent") == TokenIntents.INTENT_APP_PASSWORD:
67            # user IS in attrs
68            user: User = attrs.get("user")
69            max_token_lifetime = user.group_attributes(request).get(
70                USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME,
71            )
72            max_token_lifetime_dt = default_token_duration()
73            if max_token_lifetime is not None:
74                try:
75                    max_token_lifetime_dt = now() + timedelta_from_string(max_token_lifetime)
76                except ValueError:
77                    pass
78
79            expires = attrs.get("expires")
80            if expires is not None and expires > max_token_lifetime_dt:
81                raise ValidationError(
82                    {
83                        "expires": (
84                            f"Token expires exceeds maximum lifetime ({max_token_lifetime_dt} UTC)."
85                        )
86                    }
87                )
88        elif attrs.get("intent") == TokenIntents.INTENT_API:
89            # For API tokens, expires cannot be overridden
90            attrs["expires"] = default_token_duration()
91
92        return attrs

Ensure only API or App password tokens are created.

class TokenSerializer.Meta:
 94    class Meta:
 95        model = Token
 96        fields = [
 97            "pk",
 98            "managed",
 99            "identifier",
100            "intent",
101            "user",
102            "user_obj",
103            "description",
104            "expires",
105            "expiring",
106        ]
107        extra_kwargs = {
108            "user": {"required": False},
109        }
model = <class 'authentik.core.models.Token'>
fields = ['pk', 'managed', 'identifier', 'intent', 'user', 'user_obj', 'description', 'expires', 'expiring']
extra_kwargs = {'user': {'required': False}}
class TokenSetKeySerializer(authentik.core.api.utils.PassiveSerializer):
112class TokenSetKeySerializer(PassiveSerializer):
113    """Set token's key"""
114
115    key = CharField()

Set token's key

key
class TokenViewSerializer(authentik.core.api.utils.PassiveSerializer):
118class TokenViewSerializer(PassiveSerializer):
119    """Show token's current key"""
120
121    key = CharField(read_only=True)

Show token's current key

key
class TokenViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
124class TokenViewSet(UsedByMixin, ModelViewSet):
125    """Token Viewset"""
126
127    lookup_field = "identifier"
128    queryset = Token.objects.including_expired().all()
129    serializer_class = TokenSerializer
130    search_fields = [
131        "identifier",
132        "intent",
133        "user__username",
134        "description",
135    ]
136    filterset_fields = [
137        "identifier",
138        "intent",
139        "user__username",
140        "description",
141        "expires",
142        "expiring",
143        "managed",
144    ]
145    ordering = ["identifier", "expires"]
146    owner_field = "user"
147    rbac_allow_create_without_perm = True
148
149    def perform_create(self, serializer: TokenSerializer):
150        if not self.request.user.is_superuser:
151            instance = serializer.save(
152                user=self.request.user,
153                expiring=self.request.user.attributes.get(USER_ATTRIBUTE_TOKEN_EXPIRING, True),
154            )
155            self.request.user.assign_perms_to_managed_role(
156                "authentik_core.view_token_key", instance
157            )
158            return instance
159        return super().perform_create(serializer)
160
161    @permission_required("authentik_core.view_token_key")
162    @extend_schema(
163        responses={
164            200: TokenViewSerializer(many=False),
165            404: OpenApiResponse(description="Token not found or expired"),
166        }
167    )
168    @action(detail=True, pagination_class=None, filter_backends=[], methods=["GET"])
169    def view_key(self, request: Request, identifier: str) -> Response:
170        """Return token key and log access"""
171        token: Token = self.get_object()
172        Event.new(EventAction.SECRET_VIEW, secret=token).from_http(request)  # noqa # nosec
173        return Response(TokenViewSerializer({"key": token.key}).data)
174
175    @permission_required("authentik_core.set_token_key")
176    @extend_schema(
177        request=TokenSetKeySerializer(),
178        responses={
179            204: OpenApiResponse(description="Successfully changed key"),
180            400: OpenApiResponse(description="Missing key"),
181            404: OpenApiResponse(description="Token not found or expired"),
182        },
183    )
184    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
185    @validate(TokenSetKeySerializer)
186    def set_key(self, request: Request, identifier: str, body: TokenSetKeySerializer) -> Response:
187        """Set token key. Action is logged as event. `authentik_core.set_token_key` permission
188        is required."""
189        token: Token = self.get_object()
190        key = body.validated_data.get("key")
191        if not key:
192            return Response(status=400)
193        token.key = key
194        token.save()
195        Event.new(EventAction.MODEL_UPDATED, model=model_to_dict(token)).from_http(
196            request
197        )  # noqa # nosec
198        return Response(status=204)

Token Viewset

lookup_field = 'identifier'
queryset = <QuerySet []>
serializer_class = <class 'TokenSerializer'>
search_fields = ['identifier', 'intent', 'user__username', 'description']
filterset_fields = ['identifier', 'intent', 'user__username', 'description', 'expires', 'expiring', 'managed']
ordering = ['identifier', 'expires']
owner_field = 'user'
rbac_allow_create_without_perm = True
def perform_create(self, serializer: TokenSerializer):
149    def perform_create(self, serializer: TokenSerializer):
150        if not self.request.user.is_superuser:
151            instance = serializer.save(
152                user=self.request.user,
153                expiring=self.request.user.attributes.get(USER_ATTRIBUTE_TOKEN_EXPIRING, True),
154            )
155            self.request.user.assign_perms_to_managed_role(
156                "authentik_core.view_token_key", instance
157            )
158            return instance
159        return super().perform_create(serializer)
@permission_required('authentik_core.view_token_key')
@extend_schema(responses={200: TokenViewSerializer(many=False), 404: OpenApiResponse(description='Token not found or expired')})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['GET'])
def view_key( self, request: rest_framework.request.Request, identifier: str) -> rest_framework.response.Response:
161    @permission_required("authentik_core.view_token_key")
162    @extend_schema(
163        responses={
164            200: TokenViewSerializer(many=False),
165            404: OpenApiResponse(description="Token not found or expired"),
166        }
167    )
168    @action(detail=True, pagination_class=None, filter_backends=[], methods=["GET"])
169    def view_key(self, request: Request, identifier: str) -> Response:
170        """Return token key and log access"""
171        token: Token = self.get_object()
172        Event.new(EventAction.SECRET_VIEW, secret=token).from_http(request)  # noqa # nosec
173        return Response(TokenViewSerializer({"key": token.key}).data)

Return token key and log access

@permission_required('authentik_core.set_token_key')
@extend_schema(request=TokenSetKeySerializer(), responses={204: OpenApiResponse(description='Successfully changed key'), 400: OpenApiResponse(description='Missing key'), 404: OpenApiResponse(description='Token not found or expired')})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(TokenSetKeySerializer)
def set_key( self, request: rest_framework.request.Request, identifier: str, body: TokenSetKeySerializer) -> rest_framework.response.Response:
175    @permission_required("authentik_core.set_token_key")
176    @extend_schema(
177        request=TokenSetKeySerializer(),
178        responses={
179            204: OpenApiResponse(description="Successfully changed key"),
180            400: OpenApiResponse(description="Missing key"),
181            404: OpenApiResponse(description="Token not found or expired"),
182        },
183    )
184    @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"])
185    @validate(TokenSetKeySerializer)
186    def set_key(self, request: Request, identifier: str, body: TokenSetKeySerializer) -> Response:
187        """Set token key. Action is logged as event. `authentik_core.set_token_key` permission
188        is required."""
189        token: Token = self.get_object()
190        key = body.validated_data.get("key")
191        if not key:
192            return Response(status=400)
193        token.key = key
194        token.save()
195        Event.new(EventAction.MODEL_UPDATED, model=model_to_dict(token)).from_http(
196            request
197        )  # noqa # nosec
198        return Response(status=204)

Set token key. Action is logged as event. authentik_core.set_token_key permission is required.

name = None
description = None
suffix = None
detail = None
basename = None