authentik.core.api.authenticated_sessions

AuthenticatedSessions API Viewset

  1"""AuthenticatedSessions API Viewset"""
  2
  3from typing import TypedDict
  4
  5from drf_spectacular.utils import (
  6    extend_schema,
  7    inline_serializer,
  8)
  9from rest_framework import mixins, serializers
 10from rest_framework.decorators import action
 11from rest_framework.fields import SerializerMethodField
 12from rest_framework.request import Request
 13from rest_framework.response import Response
 14from rest_framework.serializers import (
 15    CharField,
 16    DateTimeField,
 17    IPAddressField,
 18    ListField,
 19)
 20from rest_framework.viewsets import GenericViewSet
 21from ua_parser import user_agent_parser
 22
 23from authentik.api.validation import validate
 24from authentik.core.api.used_by import UsedByMixin
 25from authentik.core.api.utils import ModelSerializer, PassiveSerializer
 26from authentik.core.models import AuthenticatedSession
 27from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR, ASNDict
 28from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR, GeoIPDict
 29from authentik.rbac.decorators import permission_required
 30
 31
 32class UserAgentDeviceDict(TypedDict):
 33    """User agent device"""
 34
 35    brand: str
 36    family: str
 37    model: str
 38
 39
 40class UserAgentOSDict(TypedDict):
 41    """User agent os"""
 42
 43    family: str
 44    major: str
 45    minor: str
 46    patch: str
 47    patch_minor: str
 48
 49
 50class UserAgentBrowserDict(TypedDict):
 51    """User agent browser"""
 52
 53    family: str
 54    major: str
 55    minor: str
 56    patch: str
 57
 58
 59class UserAgentDict(TypedDict):
 60    """User agent details"""
 61
 62    device: UserAgentDeviceDict
 63    os: UserAgentOSDict
 64    user_agent: UserAgentBrowserDict
 65    string: str
 66
 67
 68class BulkDeleteSessionSerializer(PassiveSerializer):
 69    """Serializer for bulk deleting authenticated sessions by user"""
 70
 71    user_pks = ListField(
 72        child=serializers.IntegerField(), help_text="List of user IDs to revoke all sessions for"
 73    )
 74
 75
 76class AuthenticatedSessionSerializer(ModelSerializer):
 77    """AuthenticatedSession Serializer"""
 78
 79    expires = DateTimeField(source="session.expires", read_only=True)
 80    last_ip = IPAddressField(source="session.last_ip", read_only=True)
 81    last_user_agent = CharField(source="session.last_user_agent", read_only=True)
 82    last_used = DateTimeField(source="session.last_used", read_only=True)
 83
 84    current = SerializerMethodField()
 85    user_agent = SerializerMethodField()
 86    geo_ip = SerializerMethodField()
 87    asn = SerializerMethodField()
 88
 89    def get_current(self, instance: AuthenticatedSession) -> bool:
 90        """Check if session is currently active session"""
 91        request: Request = self.context["request"]
 92        return request._request.session.session_key == instance.session.session_key
 93
 94    def get_user_agent(self, instance: AuthenticatedSession) -> UserAgentDict:
 95        """Get parsed user agent"""
 96        return user_agent_parser.Parse(instance.session.last_user_agent)
 97
 98    def get_geo_ip(self, instance: AuthenticatedSession) -> GeoIPDict | None:  # pragma: no cover
 99        """Get GeoIP Data"""
100        return GEOIP_CONTEXT_PROCESSOR.city_dict(instance.session.last_ip)
101
102    def get_asn(self, instance: AuthenticatedSession) -> ASNDict | None:  # pragma: no cover
103        """Get ASN Data"""
104        return ASN_CONTEXT_PROCESSOR.asn_dict(instance.session.last_ip)
105
106    class Meta:
107        model = AuthenticatedSession
108        fields = [
109            "uuid",
110            "current",
111            "user_agent",
112            "geo_ip",
113            "asn",
114            "user",
115            "last_ip",
116            "last_user_agent",
117            "last_used",
118            "expires",
119        ]
120        extra_args = {"uuid": {"read_only": True}}
121
122
123class AuthenticatedSessionViewSet(
124    mixins.RetrieveModelMixin,
125    mixins.DestroyModelMixin,
126    UsedByMixin,
127    mixins.ListModelMixin,
128    GenericViewSet,
129):
130    """AuthenticatedSession Viewset"""
131
132    lookup_field = "uuid"
133    queryset = AuthenticatedSession.objects.select_related("session").all()
134    serializer_class = AuthenticatedSessionSerializer
135    search_fields = ["user__username", "session__last_ip", "session__last_user_agent"]
136    filterset_fields = ["user__username", "session__last_ip", "session__last_user_agent"]
137    ordering = ["user__username"]
138    owner_field = "user"
139
140    @permission_required("authentik_core.delete_authenticatedsession")
141    @extend_schema(
142        parameters=[BulkDeleteSessionSerializer],
143        responses={
144            200: inline_serializer(
145                "BulkDeleteSessionResponse",
146                {"deleted": serializers.IntegerField()},
147            ),
148        },
149    )
150    @validate(BulkDeleteSessionSerializer, location="query")
151    @action(detail=False, methods=["DELETE"], pagination_class=None, filter_backends=[])
152    def bulk_delete(self, request: Request, *, query: BulkDeleteSessionSerializer) -> Response:
153        """Bulk revoke all sessions for multiple users"""
154        user_pks = query.validated_data.get("user_pks", [])
155        deleted_count, _ = AuthenticatedSession.objects.filter(user_id__in=user_pks).delete()
156
157        return Response({"deleted": deleted_count}, status=200)
class UserAgentDeviceDict(typing.TypedDict):
33class UserAgentDeviceDict(TypedDict):
34    """User agent device"""
35
36    brand: str
37    family: str
38    model: str

User agent device

brand: str
family: str
model: str
class UserAgentOSDict(typing.TypedDict):
41class UserAgentOSDict(TypedDict):
42    """User agent os"""
43
44    family: str
45    major: str
46    minor: str
47    patch: str
48    patch_minor: str

User agent os

family: str
major: str
minor: str
patch: str
patch_minor: str
class UserAgentBrowserDict(typing.TypedDict):
51class UserAgentBrowserDict(TypedDict):
52    """User agent browser"""
53
54    family: str
55    major: str
56    minor: str
57    patch: str

User agent browser

family: str
major: str
minor: str
patch: str
class UserAgentDict(typing.TypedDict):
60class UserAgentDict(TypedDict):
61    """User agent details"""
62
63    device: UserAgentDeviceDict
64    os: UserAgentOSDict
65    user_agent: UserAgentBrowserDict
66    string: str

User agent details

user_agent: UserAgentBrowserDict
string: str
class BulkDeleteSessionSerializer(authentik.core.api.utils.PassiveSerializer):
69class BulkDeleteSessionSerializer(PassiveSerializer):
70    """Serializer for bulk deleting authenticated sessions by user"""
71
72    user_pks = ListField(
73        child=serializers.IntegerField(), help_text="List of user IDs to revoke all sessions for"
74    )

Serializer for bulk deleting authenticated sessions by user

user_pks
class AuthenticatedSessionSerializer(authentik.core.api.utils.ModelSerializer):
 77class AuthenticatedSessionSerializer(ModelSerializer):
 78    """AuthenticatedSession Serializer"""
 79
 80    expires = DateTimeField(source="session.expires", read_only=True)
 81    last_ip = IPAddressField(source="session.last_ip", read_only=True)
 82    last_user_agent = CharField(source="session.last_user_agent", read_only=True)
 83    last_used = DateTimeField(source="session.last_used", read_only=True)
 84
 85    current = SerializerMethodField()
 86    user_agent = SerializerMethodField()
 87    geo_ip = SerializerMethodField()
 88    asn = SerializerMethodField()
 89
 90    def get_current(self, instance: AuthenticatedSession) -> bool:
 91        """Check if session is currently active session"""
 92        request: Request = self.context["request"]
 93        return request._request.session.session_key == instance.session.session_key
 94
 95    def get_user_agent(self, instance: AuthenticatedSession) -> UserAgentDict:
 96        """Get parsed user agent"""
 97        return user_agent_parser.Parse(instance.session.last_user_agent)
 98
 99    def get_geo_ip(self, instance: AuthenticatedSession) -> GeoIPDict | None:  # pragma: no cover
100        """Get GeoIP Data"""
101        return GEOIP_CONTEXT_PROCESSOR.city_dict(instance.session.last_ip)
102
103    def get_asn(self, instance: AuthenticatedSession) -> ASNDict | None:  # pragma: no cover
104        """Get ASN Data"""
105        return ASN_CONTEXT_PROCESSOR.asn_dict(instance.session.last_ip)
106
107    class Meta:
108        model = AuthenticatedSession
109        fields = [
110            "uuid",
111            "current",
112            "user_agent",
113            "geo_ip",
114            "asn",
115            "user",
116            "last_ip",
117            "last_user_agent",
118            "last_used",
119            "expires",
120        ]
121        extra_args = {"uuid": {"read_only": True}}

AuthenticatedSession Serializer

expires
last_ip
last_user_agent
last_used
current
user_agent
geo_ip
asn
def get_current(self, instance: authentik.core.models.AuthenticatedSession) -> bool:
90    def get_current(self, instance: AuthenticatedSession) -> bool:
91        """Check if session is currently active session"""
92        request: Request = self.context["request"]
93        return request._request.session.session_key == instance.session.session_key

Check if session is currently active session

def get_user_agent( self, instance: authentik.core.models.AuthenticatedSession) -> UserAgentDict:
95    def get_user_agent(self, instance: AuthenticatedSession) -> UserAgentDict:
96        """Get parsed user agent"""
97        return user_agent_parser.Parse(instance.session.last_user_agent)

Get parsed user agent

 99    def get_geo_ip(self, instance: AuthenticatedSession) -> GeoIPDict | None:  # pragma: no cover
100        """Get GeoIP Data"""
101        return GEOIP_CONTEXT_PROCESSOR.city_dict(instance.session.last_ip)

Get GeoIP Data

103    def get_asn(self, instance: AuthenticatedSession) -> ASNDict | None:  # pragma: no cover
104        """Get ASN Data"""
105        return ASN_CONTEXT_PROCESSOR.asn_dict(instance.session.last_ip)

Get ASN Data

class AuthenticatedSessionSerializer.Meta:
107    class Meta:
108        model = AuthenticatedSession
109        fields = [
110            "uuid",
111            "current",
112            "user_agent",
113            "geo_ip",
114            "asn",
115            "user",
116            "last_ip",
117            "last_user_agent",
118            "last_used",
119            "expires",
120        ]
121        extra_args = {"uuid": {"read_only": True}}
fields = ['uuid', 'current', 'user_agent', 'geo_ip', 'asn', 'user', 'last_ip', 'last_user_agent', 'last_used', 'expires']
extra_args = {'uuid': {'read_only': True}}
class AuthenticatedSessionViewSet(rest_framework.mixins.RetrieveModelMixin, rest_framework.mixins.DestroyModelMixin, authentik.core.api.used_by.UsedByMixin, rest_framework.mixins.ListModelMixin, rest_framework.viewsets.GenericViewSet):
124class AuthenticatedSessionViewSet(
125    mixins.RetrieveModelMixin,
126    mixins.DestroyModelMixin,
127    UsedByMixin,
128    mixins.ListModelMixin,
129    GenericViewSet,
130):
131    """AuthenticatedSession Viewset"""
132
133    lookup_field = "uuid"
134    queryset = AuthenticatedSession.objects.select_related("session").all()
135    serializer_class = AuthenticatedSessionSerializer
136    search_fields = ["user__username", "session__last_ip", "session__last_user_agent"]
137    filterset_fields = ["user__username", "session__last_ip", "session__last_user_agent"]
138    ordering = ["user__username"]
139    owner_field = "user"
140
141    @permission_required("authentik_core.delete_authenticatedsession")
142    @extend_schema(
143        parameters=[BulkDeleteSessionSerializer],
144        responses={
145            200: inline_serializer(
146                "BulkDeleteSessionResponse",
147                {"deleted": serializers.IntegerField()},
148            ),
149        },
150    )
151    @validate(BulkDeleteSessionSerializer, location="query")
152    @action(detail=False, methods=["DELETE"], pagination_class=None, filter_backends=[])
153    def bulk_delete(self, request: Request, *, query: BulkDeleteSessionSerializer) -> Response:
154        """Bulk revoke all sessions for multiple users"""
155        user_pks = query.validated_data.get("user_pks", [])
156        deleted_count, _ = AuthenticatedSession.objects.filter(user_id__in=user_pks).delete()
157
158        return Response({"deleted": deleted_count}, status=200)

AuthenticatedSession Viewset

lookup_field = 'uuid'
queryset = <QuerySet []>
serializer_class = <class 'AuthenticatedSessionSerializer'>
search_fields = ['user__username', 'session__last_ip', 'session__last_user_agent']
filterset_fields = ['user__username', 'session__last_ip', 'session__last_user_agent']
ordering = ['user__username']
owner_field = 'user'
@permission_required('authentik_core.delete_authenticatedsession')
@extend_schema(parameters=[BulkDeleteSessionSerializer], responses={200: inline_serializer('BulkDeleteSessionResponse', {'deleted': serializers.IntegerField()})})
@validate(BulkDeleteSessionSerializer, location='query')
@action(detail=False, methods=['DELETE'], pagination_class=None, filter_backends=[])
def bulk_delete( self, request: rest_framework.request.Request, *, query: BulkDeleteSessionSerializer) -> rest_framework.response.Response:
141    @permission_required("authentik_core.delete_authenticatedsession")
142    @extend_schema(
143        parameters=[BulkDeleteSessionSerializer],
144        responses={
145            200: inline_serializer(
146                "BulkDeleteSessionResponse",
147                {"deleted": serializers.IntegerField()},
148            ),
149        },
150    )
151    @validate(BulkDeleteSessionSerializer, location="query")
152    @action(detail=False, methods=["DELETE"], pagination_class=None, filter_backends=[])
153    def bulk_delete(self, request: Request, *, query: BulkDeleteSessionSerializer) -> Response:
154        """Bulk revoke all sessions for multiple users"""
155        user_pks = query.validated_data.get("user_pks", [])
156        deleted_count, _ = AuthenticatedSession.objects.filter(user_id__in=user_pks).delete()
157
158        return Response({"deleted": deleted_count}, status=200)

Bulk revoke all sessions for multiple users

name = None
description = None
suffix = None
detail = None
basename = None