authentik.core.api.tokens
Tokens API Viewset
1"""Tokens API Viewset""" 2 3from typing import Any 4 5from django.utils.timezone import now 6from drf_spectacular.utils import OpenApiResponse, extend_schema 7from rest_framework.decorators import action 8from rest_framework.exceptions import ValidationError 9from rest_framework.fields import CharField 10from rest_framework.request import Request 11from rest_framework.response import Response 12from rest_framework.viewsets import ModelViewSet 13 14from authentik.api.validation import validate 15from authentik.blueprints.api import ManagedSerializer 16from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT 17from authentik.core.api.used_by import UsedByMixin 18from authentik.core.api.users import UserSerializer 19from authentik.core.api.utils import ModelSerializer, PassiveSerializer 20from authentik.core.models import ( 21 USER_ATTRIBUTE_TOKEN_EXPIRING, 22 USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME, 23 Token, 24 TokenIntents, 25 User, 26 default_token_duration, 27) 28from authentik.events.models import Event, EventAction 29from authentik.events.utils import model_to_dict 30from authentik.lib.utils.time import timedelta_from_string 31from authentik.rbac.decorators import permission_required 32 33 34class TokenSerializer(ManagedSerializer, ModelSerializer): 35 """Token Serializer""" 36 37 user_obj = UserSerializer(required=False, source="user", read_only=True) 38 39 def __init__(self, *args, **kwargs) -> None: 40 super().__init__(*args, **kwargs) 41 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 42 self.fields["key"] = CharField(required=False) 43 44 def validate_user(self, user: User): 45 """Ensure user of token cannot be changed""" 46 if self.instance and self.instance.user_id: 47 if user.pk != self.instance.user_id: 48 raise ValidationError("User cannot be changed") 49 return user 50 51 def validate(self, attrs: dict[Any, str]) -> dict[Any, str]: 52 """Ensure only API or App password tokens are created.""" 53 request: Request = self.context.get("request") 54 if not request: 55 if "user" not in attrs: 56 raise ValidationError("Missing user") 57 if "intent" not in attrs: 58 raise ValidationError("Missing intent") 59 else: 60 attrs.setdefault("user", request.user) 61 attrs.setdefault("intent", TokenIntents.INTENT_API) 62 if attrs.get("intent") not in [TokenIntents.INTENT_API, TokenIntents.INTENT_APP_PASSWORD]: 63 raise ValidationError({"intent": f"Invalid intent {attrs.get('intent')}"}) 64 65 if attrs.get("intent") == TokenIntents.INTENT_APP_PASSWORD: 66 # user IS in attrs 67 user: User = attrs.get("user") 68 max_token_lifetime = user.group_attributes(request).get( 69 USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME, 70 ) 71 max_token_lifetime_dt = default_token_duration() 72 if max_token_lifetime is not None: 73 try: 74 max_token_lifetime_dt = now() + timedelta_from_string(max_token_lifetime) 75 except ValueError: 76 pass 77 78 expires = attrs.get("expires") 79 if expires is not None and expires > max_token_lifetime_dt: 80 raise ValidationError( 81 { 82 "expires": ( 83 f"Token expires exceeds maximum lifetime ({max_token_lifetime_dt} UTC)." 84 ) 85 } 86 ) 87 elif attrs.get("intent") == TokenIntents.INTENT_API: 88 # For API tokens, expires cannot be overridden 89 attrs["expires"] = default_token_duration() 90 91 return attrs 92 93 class Meta: 94 model = Token 95 fields = [ 96 "pk", 97 "managed", 98 "identifier", 99 "intent", 100 "user", 101 "user_obj", 102 "description", 103 "expires", 104 "expiring", 105 ] 106 extra_kwargs = { 107 "user": {"required": False}, 108 } 109 110 111class TokenSetKeySerializer(PassiveSerializer): 112 """Set token's key""" 113 114 key = CharField() 115 116 117class TokenViewSerializer(PassiveSerializer): 118 """Show token's current key""" 119 120 key = CharField(read_only=True) 121 122 123class TokenViewSet(UsedByMixin, ModelViewSet): 124 """Token Viewset""" 125 126 lookup_field = "identifier" 127 queryset = Token.objects.including_expired().all() 128 serializer_class = TokenSerializer 129 search_fields = [ 130 "identifier", 131 "intent", 132 "user__username", 133 "description", 134 ] 135 filterset_fields = [ 136 "identifier", 137 "intent", 138 "user__username", 139 "description", 140 "expires", 141 "expiring", 142 "managed", 143 ] 144 ordering = ["identifier", "expires"] 145 owner_field = "user" 146 rbac_allow_create_without_perm = True 147 148 def perform_create(self, serializer: TokenSerializer): 149 if not self.request.user.is_superuser: 150 instance = serializer.save( 151 user=self.request.user, 152 expiring=self.request.user.attributes.get(USER_ATTRIBUTE_TOKEN_EXPIRING, True), 153 ) 154 self.request.user.assign_perms_to_managed_role( 155 "authentik_core.view_token_key", instance 156 ) 157 return instance 158 return super().perform_create(serializer) 159 160 @permission_required("authentik_core.view_token_key") 161 @extend_schema( 162 responses={ 163 200: TokenViewSerializer(many=False), 164 404: OpenApiResponse(description="Token not found or expired"), 165 } 166 ) 167 @action(detail=True, pagination_class=None, filter_backends=[], methods=["GET"]) 168 def view_key(self, request: Request, identifier: str) -> Response: 169 """Return token key and log access""" 170 token: Token = self.get_object() 171 Event.new(EventAction.SECRET_VIEW, secret=token).from_http(request) # noqa # nosec 172 return Response(TokenViewSerializer({"key": token.key}).data) 173 174 @permission_required("authentik_core.set_token_key") 175 @extend_schema( 176 request=TokenSetKeySerializer(), 177 responses={ 178 204: OpenApiResponse(description="Successfully changed key"), 179 400: OpenApiResponse(description="Missing key"), 180 404: OpenApiResponse(description="Token not found or expired"), 181 }, 182 ) 183 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 184 @validate(TokenSetKeySerializer) 185 def set_key(self, request: Request, identifier: str, body: TokenSetKeySerializer) -> Response: 186 """Set token key. Action is logged as event. `authentik_core.set_token_key` permission 187 is required.""" 188 token: Token = self.get_object() 189 key = body.validated_data.get("key") 190 if not key: 191 return Response(status=400) 192 token.key = key 193 token.save() 194 Event.new(EventAction.MODEL_UPDATED, model=model_to_dict(token)).from_http( 195 request 196 ) # noqa # nosec 197 return Response(status=204)
class
TokenSerializer(authentik.blueprints.api.ManagedSerializer, authentik.core.api.utils.ModelSerializer):
35class TokenSerializer(ManagedSerializer, ModelSerializer): 36 """Token Serializer""" 37 38 user_obj = UserSerializer(required=False, source="user", read_only=True) 39 40 def __init__(self, *args, **kwargs) -> None: 41 super().__init__(*args, **kwargs) 42 if SERIALIZER_CONTEXT_BLUEPRINT in self.context: 43 self.fields["key"] = CharField(required=False) 44 45 def validate_user(self, user: User): 46 """Ensure user of token cannot be changed""" 47 if self.instance and self.instance.user_id: 48 if user.pk != self.instance.user_id: 49 raise ValidationError("User cannot be changed") 50 return user 51 52 def validate(self, attrs: dict[Any, str]) -> dict[Any, str]: 53 """Ensure only API or App password tokens are created.""" 54 request: Request = self.context.get("request") 55 if not request: 56 if "user" not in attrs: 57 raise ValidationError("Missing user") 58 if "intent" not in attrs: 59 raise ValidationError("Missing intent") 60 else: 61 attrs.setdefault("user", request.user) 62 attrs.setdefault("intent", TokenIntents.INTENT_API) 63 if attrs.get("intent") not in [TokenIntents.INTENT_API, TokenIntents.INTENT_APP_PASSWORD]: 64 raise ValidationError({"intent": f"Invalid intent {attrs.get('intent')}"}) 65 66 if attrs.get("intent") == TokenIntents.INTENT_APP_PASSWORD: 67 # user IS in attrs 68 user: User = attrs.get("user") 69 max_token_lifetime = user.group_attributes(request).get( 70 USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME, 71 ) 72 max_token_lifetime_dt = default_token_duration() 73 if max_token_lifetime is not None: 74 try: 75 max_token_lifetime_dt = now() + timedelta_from_string(max_token_lifetime) 76 except ValueError: 77 pass 78 79 expires = attrs.get("expires") 80 if expires is not None and expires > max_token_lifetime_dt: 81 raise ValidationError( 82 { 83 "expires": ( 84 f"Token expires exceeds maximum lifetime ({max_token_lifetime_dt} UTC)." 85 ) 86 } 87 ) 88 elif attrs.get("intent") == TokenIntents.INTENT_API: 89 # For API tokens, expires cannot be overridden 90 attrs["expires"] = default_token_duration() 91 92 return attrs 93 94 class Meta: 95 model = Token 96 fields = [ 97 "pk", 98 "managed", 99 "identifier", 100 "intent", 101 "user", 102 "user_obj", 103 "description", 104 "expires", 105 "expiring", 106 ] 107 extra_kwargs = { 108 "user": {"required": False}, 109 }
Token Serializer
45 def validate_user(self, user: User): 46 """Ensure user of token cannot be changed""" 47 if self.instance and self.instance.user_id: 48 if user.pk != self.instance.user_id: 49 raise ValidationError("User cannot be changed") 50 return user
Ensure user of token cannot be changed
def
validate(self, attrs: dict[typing.Any, str]) -> dict[typing.Any, str]:
52 def validate(self, attrs: dict[Any, str]) -> dict[Any, str]: 53 """Ensure only API or App password tokens are created.""" 54 request: Request = self.context.get("request") 55 if not request: 56 if "user" not in attrs: 57 raise ValidationError("Missing user") 58 if "intent" not in attrs: 59 raise ValidationError("Missing intent") 60 else: 61 attrs.setdefault("user", request.user) 62 attrs.setdefault("intent", TokenIntents.INTENT_API) 63 if attrs.get("intent") not in [TokenIntents.INTENT_API, TokenIntents.INTENT_APP_PASSWORD]: 64 raise ValidationError({"intent": f"Invalid intent {attrs.get('intent')}"}) 65 66 if attrs.get("intent") == TokenIntents.INTENT_APP_PASSWORD: 67 # user IS in attrs 68 user: User = attrs.get("user") 69 max_token_lifetime = user.group_attributes(request).get( 70 USER_ATTRIBUTE_TOKEN_MAXIMUM_LIFETIME, 71 ) 72 max_token_lifetime_dt = default_token_duration() 73 if max_token_lifetime is not None: 74 try: 75 max_token_lifetime_dt = now() + timedelta_from_string(max_token_lifetime) 76 except ValueError: 77 pass 78 79 expires = attrs.get("expires") 80 if expires is not None and expires > max_token_lifetime_dt: 81 raise ValidationError( 82 { 83 "expires": ( 84 f"Token expires exceeds maximum lifetime ({max_token_lifetime_dt} UTC)." 85 ) 86 } 87 ) 88 elif attrs.get("intent") == TokenIntents.INTENT_API: 89 # For API tokens, expires cannot be overridden 90 attrs["expires"] = default_token_duration() 91 92 return attrs
Ensure only API or App password tokens are created.
class
TokenSerializer.Meta:
94 class Meta: 95 model = Token 96 fields = [ 97 "pk", 98 "managed", 99 "identifier", 100 "intent", 101 "user", 102 "user_obj", 103 "description", 104 "expires", 105 "expiring", 106 ] 107 extra_kwargs = { 108 "user": {"required": False}, 109 }
model =
<class 'authentik.core.models.Token'>
112class TokenSetKeySerializer(PassiveSerializer): 113 """Set token's key""" 114 115 key = CharField()
Set token's key
Inherited Members
118class TokenViewSerializer(PassiveSerializer): 119 """Show token's current key""" 120 121 key = CharField(read_only=True)
Show token's current key
Inherited Members
124class TokenViewSet(UsedByMixin, ModelViewSet): 125 """Token Viewset""" 126 127 lookup_field = "identifier" 128 queryset = Token.objects.including_expired().all() 129 serializer_class = TokenSerializer 130 search_fields = [ 131 "identifier", 132 "intent", 133 "user__username", 134 "description", 135 ] 136 filterset_fields = [ 137 "identifier", 138 "intent", 139 "user__username", 140 "description", 141 "expires", 142 "expiring", 143 "managed", 144 ] 145 ordering = ["identifier", "expires"] 146 owner_field = "user" 147 rbac_allow_create_without_perm = True 148 149 def perform_create(self, serializer: TokenSerializer): 150 if not self.request.user.is_superuser: 151 instance = serializer.save( 152 user=self.request.user, 153 expiring=self.request.user.attributes.get(USER_ATTRIBUTE_TOKEN_EXPIRING, True), 154 ) 155 self.request.user.assign_perms_to_managed_role( 156 "authentik_core.view_token_key", instance 157 ) 158 return instance 159 return super().perform_create(serializer) 160 161 @permission_required("authentik_core.view_token_key") 162 @extend_schema( 163 responses={ 164 200: TokenViewSerializer(many=False), 165 404: OpenApiResponse(description="Token not found or expired"), 166 } 167 ) 168 @action(detail=True, pagination_class=None, filter_backends=[], methods=["GET"]) 169 def view_key(self, request: Request, identifier: str) -> Response: 170 """Return token key and log access""" 171 token: Token = self.get_object() 172 Event.new(EventAction.SECRET_VIEW, secret=token).from_http(request) # noqa # nosec 173 return Response(TokenViewSerializer({"key": token.key}).data) 174 175 @permission_required("authentik_core.set_token_key") 176 @extend_schema( 177 request=TokenSetKeySerializer(), 178 responses={ 179 204: OpenApiResponse(description="Successfully changed key"), 180 400: OpenApiResponse(description="Missing key"), 181 404: OpenApiResponse(description="Token not found or expired"), 182 }, 183 ) 184 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 185 @validate(TokenSetKeySerializer) 186 def set_key(self, request: Request, identifier: str, body: TokenSetKeySerializer) -> Response: 187 """Set token key. Action is logged as event. `authentik_core.set_token_key` permission 188 is required.""" 189 token: Token = self.get_object() 190 key = body.validated_data.get("key") 191 if not key: 192 return Response(status=400) 193 token.key = key 194 token.save() 195 Event.new(EventAction.MODEL_UPDATED, model=model_to_dict(token)).from_http( 196 request 197 ) # noqa # nosec 198 return Response(status=204)
Token Viewset
serializer_class =
<class 'TokenSerializer'>
filterset_fields =
['identifier', 'intent', 'user__username', 'description', 'expires', 'expiring', 'managed']
149 def perform_create(self, serializer: TokenSerializer): 150 if not self.request.user.is_superuser: 151 instance = serializer.save( 152 user=self.request.user, 153 expiring=self.request.user.attributes.get(USER_ATTRIBUTE_TOKEN_EXPIRING, True), 154 ) 155 self.request.user.assign_perms_to_managed_role( 156 "authentik_core.view_token_key", instance 157 ) 158 return instance 159 return super().perform_create(serializer)
@extend_schema(responses={200: TokenViewSerializer(many=False), 404: OpenApiResponse(description='Token not found or expired')})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['GET'])
def
view_key( self, request: rest_framework.request.Request, identifier: str) -> rest_framework.response.Response:
161 @permission_required("authentik_core.view_token_key") 162 @extend_schema( 163 responses={ 164 200: TokenViewSerializer(many=False), 165 404: OpenApiResponse(description="Token not found or expired"), 166 } 167 ) 168 @action(detail=True, pagination_class=None, filter_backends=[], methods=["GET"]) 169 def view_key(self, request: Request, identifier: str) -> Response: 170 """Return token key and log access""" 171 token: Token = self.get_object() 172 Event.new(EventAction.SECRET_VIEW, secret=token).from_http(request) # noqa # nosec 173 return Response(TokenViewSerializer({"key": token.key}).data)
Return token key and log access
@extend_schema(request=TokenSetKeySerializer(), responses={204: OpenApiResponse(description='Successfully changed key'), 400: OpenApiResponse(description='Missing key'), 404: OpenApiResponse(description='Token not found or expired')})
@action(detail=True, pagination_class=None, filter_backends=[], methods=['POST'])
@validate(TokenSetKeySerializer)
def
set_key( self, request: rest_framework.request.Request, identifier: str, body: TokenSetKeySerializer) -> rest_framework.response.Response:
175 @permission_required("authentik_core.set_token_key") 176 @extend_schema( 177 request=TokenSetKeySerializer(), 178 responses={ 179 204: OpenApiResponse(description="Successfully changed key"), 180 400: OpenApiResponse(description="Missing key"), 181 404: OpenApiResponse(description="Token not found or expired"), 182 }, 183 ) 184 @action(detail=True, pagination_class=None, filter_backends=[], methods=["POST"]) 185 @validate(TokenSetKeySerializer) 186 def set_key(self, request: Request, identifier: str, body: TokenSetKeySerializer) -> Response: 187 """Set token key. Action is logged as event. `authentik_core.set_token_key` permission 188 is required.""" 189 token: Token = self.get_object() 190 key = body.validated_data.get("key") 191 if not key: 192 return Response(status=400) 193 token.key = key 194 token.save() 195 Event.new(EventAction.MODEL_UPDATED, model=model_to_dict(token)).from_http( 196 request 197 ) # noqa # nosec 198 return Response(status=204)
Set token key. Action is logged as event. authentik_core.set_token_key permission
is required.