authentik.core.api.authenticated_sessions
AuthenticatedSessions API Viewset
1"""AuthenticatedSessions API Viewset""" 2 3from typing import TypedDict 4 5from drf_spectacular.utils import ( 6 extend_schema, 7 inline_serializer, 8) 9from rest_framework import mixins, serializers 10from rest_framework.decorators import action 11from rest_framework.fields import SerializerMethodField 12from rest_framework.request import Request 13from rest_framework.response import Response 14from rest_framework.serializers import ( 15 CharField, 16 DateTimeField, 17 IPAddressField, 18 ListField, 19) 20from rest_framework.viewsets import GenericViewSet 21from ua_parser import user_agent_parser 22 23from authentik.api.validation import validate 24from authentik.core.api.used_by import UsedByMixin 25from authentik.core.api.utils import ModelSerializer, PassiveSerializer 26from authentik.core.models import AuthenticatedSession 27from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR, ASNDict 28from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR, GeoIPDict 29from authentik.rbac.decorators import permission_required 30 31 32class UserAgentDeviceDict(TypedDict): 33 """User agent device""" 34 35 brand: str 36 family: str 37 model: str 38 39 40class UserAgentOSDict(TypedDict): 41 """User agent os""" 42 43 family: str 44 major: str 45 minor: str 46 patch: str 47 patch_minor: str 48 49 50class UserAgentBrowserDict(TypedDict): 51 """User agent browser""" 52 53 family: str 54 major: str 55 minor: str 56 patch: str 57 58 59class UserAgentDict(TypedDict): 60 """User agent details""" 61 62 device: UserAgentDeviceDict 63 os: UserAgentOSDict 64 user_agent: UserAgentBrowserDict 65 string: str 66 67 68class BulkDeleteSessionSerializer(PassiveSerializer): 69 """Serializer for bulk deleting authenticated sessions by user""" 70 71 user_pks = ListField( 72 child=serializers.IntegerField(), help_text="List of user IDs to revoke all sessions for" 73 ) 74 75 76class AuthenticatedSessionSerializer(ModelSerializer): 77 """AuthenticatedSession Serializer""" 78 79 expires = DateTimeField(source="session.expires", read_only=True) 80 last_ip = IPAddressField(source="session.last_ip", read_only=True) 81 last_user_agent = CharField(source="session.last_user_agent", read_only=True) 82 last_used = DateTimeField(source="session.last_used", read_only=True) 83 84 current = SerializerMethodField() 85 user_agent = SerializerMethodField() 86 geo_ip = SerializerMethodField() 87 asn = SerializerMethodField() 88 89 def get_current(self, instance: AuthenticatedSession) -> bool: 90 """Check if session is currently active session""" 91 request: Request = self.context["request"] 92 return request._request.session.session_key == instance.session.session_key 93 94 def get_user_agent(self, instance: AuthenticatedSession) -> UserAgentDict: 95 """Get parsed user agent""" 96 return user_agent_parser.Parse(instance.session.last_user_agent) 97 98 def get_geo_ip(self, instance: AuthenticatedSession) -> GeoIPDict | None: # pragma: no cover 99 """Get GeoIP Data""" 100 return GEOIP_CONTEXT_PROCESSOR.city_dict(instance.session.last_ip) 101 102 def get_asn(self, instance: AuthenticatedSession) -> ASNDict | None: # pragma: no cover 103 """Get ASN Data""" 104 return ASN_CONTEXT_PROCESSOR.asn_dict(instance.session.last_ip) 105 106 class Meta: 107 model = AuthenticatedSession 108 fields = [ 109 "uuid", 110 "current", 111 "user_agent", 112 "geo_ip", 113 "asn", 114 "user", 115 "last_ip", 116 "last_user_agent", 117 "last_used", 118 "expires", 119 ] 120 extra_args = {"uuid": {"read_only": True}} 121 122 123class AuthenticatedSessionViewSet( 124 mixins.RetrieveModelMixin, 125 mixins.DestroyModelMixin, 126 UsedByMixin, 127 mixins.ListModelMixin, 128 GenericViewSet, 129): 130 """AuthenticatedSession Viewset""" 131 132 lookup_field = "uuid" 133 queryset = AuthenticatedSession.objects.select_related("session").all() 134 serializer_class = AuthenticatedSessionSerializer 135 search_fields = ["user__username", "session__last_ip", "session__last_user_agent"] 136 filterset_fields = ["user__username", "session__last_ip", "session__last_user_agent"] 137 ordering = ["user__username"] 138 owner_field = "user" 139 140 @permission_required("authentik_core.delete_authenticatedsession") 141 @extend_schema( 142 parameters=[BulkDeleteSessionSerializer], 143 responses={ 144 200: inline_serializer( 145 "BulkDeleteSessionResponse", 146 {"deleted": serializers.IntegerField()}, 147 ), 148 }, 149 ) 150 @validate(BulkDeleteSessionSerializer, location="query") 151 @action(detail=False, methods=["DELETE"], pagination_class=None, filter_backends=[]) 152 def bulk_delete(self, request: Request, *, query: BulkDeleteSessionSerializer) -> Response: 153 """Bulk revoke all sessions for multiple users""" 154 user_pks = query.validated_data.get("user_pks", []) 155 deleted_count, _ = AuthenticatedSession.objects.filter(user_id__in=user_pks).delete() 156 157 return Response({"deleted": deleted_count}, status=200)
class
UserAgentDeviceDict(typing.TypedDict):
33class UserAgentDeviceDict(TypedDict): 34 """User agent device""" 35 36 brand: str 37 family: str 38 model: str
User agent device
class
UserAgentOSDict(typing.TypedDict):
41class UserAgentOSDict(TypedDict): 42 """User agent os""" 43 44 family: str 45 major: str 46 minor: str 47 patch: str 48 patch_minor: str
User agent os
class
UserAgentBrowserDict(typing.TypedDict):
51class UserAgentBrowserDict(TypedDict): 52 """User agent browser""" 53 54 family: str 55 major: str 56 minor: str 57 patch: str
User agent browser
class
UserAgentDict(typing.TypedDict):
60class UserAgentDict(TypedDict): 61 """User agent details""" 62 63 device: UserAgentDeviceDict 64 os: UserAgentOSDict 65 user_agent: UserAgentBrowserDict 66 string: str
User agent details
device: UserAgentDeviceDict
os: UserAgentOSDict
user_agent: UserAgentBrowserDict
69class BulkDeleteSessionSerializer(PassiveSerializer): 70 """Serializer for bulk deleting authenticated sessions by user""" 71 72 user_pks = ListField( 73 child=serializers.IntegerField(), help_text="List of user IDs to revoke all sessions for" 74 )
Serializer for bulk deleting authenticated sessions by user
Inherited Members
77class AuthenticatedSessionSerializer(ModelSerializer): 78 """AuthenticatedSession Serializer""" 79 80 expires = DateTimeField(source="session.expires", read_only=True) 81 last_ip = IPAddressField(source="session.last_ip", read_only=True) 82 last_user_agent = CharField(source="session.last_user_agent", read_only=True) 83 last_used = DateTimeField(source="session.last_used", read_only=True) 84 85 current = SerializerMethodField() 86 user_agent = SerializerMethodField() 87 geo_ip = SerializerMethodField() 88 asn = SerializerMethodField() 89 90 def get_current(self, instance: AuthenticatedSession) -> bool: 91 """Check if session is currently active session""" 92 request: Request = self.context["request"] 93 return request._request.session.session_key == instance.session.session_key 94 95 def get_user_agent(self, instance: AuthenticatedSession) -> UserAgentDict: 96 """Get parsed user agent""" 97 return user_agent_parser.Parse(instance.session.last_user_agent) 98 99 def get_geo_ip(self, instance: AuthenticatedSession) -> GeoIPDict | None: # pragma: no cover 100 """Get GeoIP Data""" 101 return GEOIP_CONTEXT_PROCESSOR.city_dict(instance.session.last_ip) 102 103 def get_asn(self, instance: AuthenticatedSession) -> ASNDict | None: # pragma: no cover 104 """Get ASN Data""" 105 return ASN_CONTEXT_PROCESSOR.asn_dict(instance.session.last_ip) 106 107 class Meta: 108 model = AuthenticatedSession 109 fields = [ 110 "uuid", 111 "current", 112 "user_agent", 113 "geo_ip", 114 "asn", 115 "user", 116 "last_ip", 117 "last_user_agent", 118 "last_used", 119 "expires", 120 ] 121 extra_args = {"uuid": {"read_only": True}}
AuthenticatedSession Serializer
90 def get_current(self, instance: AuthenticatedSession) -> bool: 91 """Check if session is currently active session""" 92 request: Request = self.context["request"] 93 return request._request.session.session_key == instance.session.session_key
Check if session is currently active session
95 def get_user_agent(self, instance: AuthenticatedSession) -> UserAgentDict: 96 """Get parsed user agent""" 97 return user_agent_parser.Parse(instance.session.last_user_agent)
Get parsed user agent
def
get_geo_ip( self, instance: authentik.core.models.AuthenticatedSession) -> authentik.events.context_processors.geoip.GeoIPDict | None:
99 def get_geo_ip(self, instance: AuthenticatedSession) -> GeoIPDict | None: # pragma: no cover 100 """Get GeoIP Data""" 101 return GEOIP_CONTEXT_PROCESSOR.city_dict(instance.session.last_ip)
Get GeoIP Data
def
get_asn( self, instance: authentik.core.models.AuthenticatedSession) -> authentik.events.context_processors.asn.ASNDict | None:
103 def get_asn(self, instance: AuthenticatedSession) -> ASNDict | None: # pragma: no cover 104 """Get ASN Data""" 105 return ASN_CONTEXT_PROCESSOR.asn_dict(instance.session.last_ip)
Get ASN Data
Inherited Members
class
AuthenticatedSessionSerializer.Meta:
107 class Meta: 108 model = AuthenticatedSession 109 fields = [ 110 "uuid", 111 "current", 112 "user_agent", 113 "geo_ip", 114 "asn", 115 "user", 116 "last_ip", 117 "last_user_agent", 118 "last_used", 119 "expires", 120 ] 121 extra_args = {"uuid": {"read_only": True}}
model =
<class 'authentik.core.models.AuthenticatedSession'>
class
AuthenticatedSessionViewSet(rest_framework.mixins.RetrieveModelMixin, rest_framework.mixins.DestroyModelMixin, authentik.core.api.used_by.UsedByMixin, rest_framework.mixins.ListModelMixin, rest_framework.viewsets.GenericViewSet):
124class AuthenticatedSessionViewSet( 125 mixins.RetrieveModelMixin, 126 mixins.DestroyModelMixin, 127 UsedByMixin, 128 mixins.ListModelMixin, 129 GenericViewSet, 130): 131 """AuthenticatedSession Viewset""" 132 133 lookup_field = "uuid" 134 queryset = AuthenticatedSession.objects.select_related("session").all() 135 serializer_class = AuthenticatedSessionSerializer 136 search_fields = ["user__username", "session__last_ip", "session__last_user_agent"] 137 filterset_fields = ["user__username", "session__last_ip", "session__last_user_agent"] 138 ordering = ["user__username"] 139 owner_field = "user" 140 141 @permission_required("authentik_core.delete_authenticatedsession") 142 @extend_schema( 143 parameters=[BulkDeleteSessionSerializer], 144 responses={ 145 200: inline_serializer( 146 "BulkDeleteSessionResponse", 147 {"deleted": serializers.IntegerField()}, 148 ), 149 }, 150 ) 151 @validate(BulkDeleteSessionSerializer, location="query") 152 @action(detail=False, methods=["DELETE"], pagination_class=None, filter_backends=[]) 153 def bulk_delete(self, request: Request, *, query: BulkDeleteSessionSerializer) -> Response: 154 """Bulk revoke all sessions for multiple users""" 155 user_pks = query.validated_data.get("user_pks", []) 156 deleted_count, _ = AuthenticatedSession.objects.filter(user_id__in=user_pks).delete() 157 158 return Response({"deleted": deleted_count}, status=200)
AuthenticatedSession Viewset
serializer_class =
<class 'AuthenticatedSessionSerializer'>
@extend_schema(parameters=[BulkDeleteSessionSerializer], responses={200: inline_serializer('BulkDeleteSessionResponse', {'deleted': serializers.IntegerField()})})
@validate(BulkDeleteSessionSerializer, location='query')
@action(detail=False, methods=['DELETE'], pagination_class=None, filter_backends=[])
def
bulk_delete( self, request: rest_framework.request.Request, *, query: BulkDeleteSessionSerializer) -> rest_framework.response.Response:
141 @permission_required("authentik_core.delete_authenticatedsession") 142 @extend_schema( 143 parameters=[BulkDeleteSessionSerializer], 144 responses={ 145 200: inline_serializer( 146 "BulkDeleteSessionResponse", 147 {"deleted": serializers.IntegerField()}, 148 ), 149 }, 150 ) 151 @validate(BulkDeleteSessionSerializer, location="query") 152 @action(detail=False, methods=["DELETE"], pagination_class=None, filter_backends=[]) 153 def bulk_delete(self, request: Request, *, query: BulkDeleteSessionSerializer) -> Response: 154 """Bulk revoke all sessions for multiple users""" 155 user_pks = query.validated_data.get("user_pks", []) 156 deleted_count, _ = AuthenticatedSession.objects.filter(user_id__in=user_pks).delete() 157 158 return Response({"deleted": deleted_count}, status=200)
Bulk revoke all sessions for multiple users