authentik.enterprise.providers.ssf.views.stream

  1from uuid import uuid4
  2
  3from django.http import Http404, HttpRequest
  4from django.urls import reverse
  5from rest_framework.exceptions import PermissionDenied, ValidationError
  6from rest_framework.fields import CharField, ChoiceField, ListField, SerializerMethodField
  7from rest_framework.request import Request
  8from rest_framework.response import Response
  9from structlog.stdlib import get_logger
 10
 11from authentik.api.validation import validate
 12from authentik.core.api.utils import ModelSerializer, PassiveSerializer
 13from authentik.enterprise.providers.ssf.models import (
 14    DeliveryMethods,
 15    EventTypes,
 16    SSFProvider,
 17    Stream,
 18    StreamStatus,
 19)
 20from authentik.enterprise.providers.ssf.tasks import send_ssf_events
 21from authentik.enterprise.providers.ssf.views.base import SSFStreamView
 22
 23LOGGER = get_logger()
 24
 25
 26class StreamDeliverySerializer(PassiveSerializer):
 27    method = ChoiceField(choices=[(x.value, x.value) for x in DeliveryMethods])
 28    endpoint_url = CharField(required=False)
 29    authorization_header = CharField(required=False)
 30
 31    def validate_method(self, method: DeliveryMethods):
 32        """Currently only push is supported"""
 33        if method == DeliveryMethods.RISC_POLL:
 34            raise ValidationError("Polling for SSF events is not currently supported.")
 35        return method
 36
 37    def validate(self, attrs: dict) -> dict:
 38        if attrs.get("method") in [DeliveryMethods.RISC_PUSH, DeliveryMethods.RFC_PUSH]:
 39            if not attrs.get("endpoint_url"):
 40                raise ValidationError("Endpoint URL is required when using push.")
 41        return attrs
 42
 43
 44class StreamSerializer(ModelSerializer):
 45    delivery = StreamDeliverySerializer()
 46    events_requested = ListField(
 47        child=ChoiceField(choices=[(x.value, x.value) for x in EventTypes])
 48    )
 49    format = CharField(default="iss_sub")
 50    aud = ListField(child=CharField(), allow_empty=True, default=list)
 51
 52    def create(self, validated_data):
 53        provider: SSFProvider = validated_data["provider"]
 54        request: HttpRequest = self.context["request"]
 55        iss = request.build_absolute_uri(
 56            reverse(
 57                "authentik_providers_ssf:configuration",
 58                kwargs={
 59                    "application_slug": provider.backchannel_application.slug,
 60                },
 61            )
 62        )
 63        # Ensure that streams always get SET verification events sent to them
 64        validated_data["events_requested"].append(EventTypes.SET_VERIFICATION)
 65        stream_id = uuid4()
 66        default_aud = f"goauthentik.io/providers/ssf/{str(stream_id)}"
 67        return super().create(
 68            {
 69                "delivery_method": validated_data["delivery"]["method"],
 70                "endpoint_url": validated_data["delivery"].get("endpoint_url"),
 71                "authorization_header": validated_data["delivery"].get("authorization_header"),
 72                "format": validated_data["format"],
 73                "provider": validated_data["provider"],
 74                "events_requested": validated_data["events_requested"],
 75                "aud": validated_data["aud"] or [default_aud],
 76                "iss": iss,
 77                "pk": stream_id,
 78            }
 79        )
 80
 81    class Meta:
 82        model = Stream
 83        fields = [
 84            "delivery",
 85            "events_requested",
 86            "format",
 87            "aud",
 88        ]
 89
 90
 91class StreamResponseSerializer(PassiveSerializer):
 92    stream_id = CharField(source="pk")
 93    iss = CharField()
 94    aud = ListField(child=CharField())
 95    delivery = SerializerMethodField()
 96    format = CharField()
 97
 98    events_requested = ListField(child=CharField())
 99    events_supported = SerializerMethodField()
100    events_delivered = ListField(child=CharField(), source="events_requested")
101
102    def get_delivery(self, instance: Stream) -> StreamDeliverySerializer:
103        return {
104            "method": instance.delivery_method,
105            "endpoint_url": instance.endpoint_url,
106        }
107
108    def get_events_supported(self, instance: Stream) -> list[str]:
109        return [
110            EventTypes.CAEP_SESSION_REVOKED,
111            EventTypes.CAEP_CREDENTIAL_CHANGE,
112            EventTypes.SET_VERIFICATION,
113        ]
114
115
116class StreamView(SSFStreamView):
117
118    def get(self, request: Request, *args, **kwargs):
119        stream = self.get_object()
120        return Response(
121            StreamResponseSerializer(instance=stream, context={"request": request}).data
122        )
123
124    @validate(StreamSerializer)
125    def post(self, request: Request, *args, body: StreamSerializer, **kwargs) -> Response:
126        if not request.user.has_perm("authentik_providers_ssf.add_stream", self.provider):
127            raise PermissionDenied(
128                "User does not have permission to create stream for this provider."
129            )
130        instance: Stream = body.save(provider=self.provider)
131
132        LOGGER.info("Sending verification event", stream=instance)
133        send_ssf_events(
134            EventTypes.SET_VERIFICATION,
135            {},
136            stream_filter={"pk": instance.uuid},
137            request=request,
138            sub_id={"format": "opaque", "id": str(instance.uuid)},
139        )
140        response = StreamResponseSerializer(instance=instance, context={"request": request}).data
141        return Response(response, status=201)
142
143    def patch(self, request: Request, *args, **kwargs) -> Response:
144        stream = self.get_object()
145        serializer = StreamSerializer(stream, data=request.data, partial=True)
146        serializer.is_valid(raise_exception=True)
147        serializer.save()
148        response = StreamResponseSerializer(
149            instance=serializer.instance, context={"request": request}
150        ).data
151        return Response(response, status=200)
152
153    def put(self, request: Request, *args, **kwargs) -> Response:
154        stream = self.get_object()
155        serializer = StreamSerializer(stream, data=request.data)
156        serializer.is_valid(raise_exception=True)
157        serializer.save()
158        response = StreamResponseSerializer(
159            instance=serializer.instance, context={"request": request}
160        ).data
161        return Response(response, status=200)
162
163    def delete(self, request: Request, *args, **kwargs) -> Response:
164        stream = self.get_object()
165        if stream.status == StreamStatus.DISABLED_DELETED:
166            raise Http404
167        stream.status = StreamStatus.DISABLED_DELETED
168        stream.save()
169        return Response(status=204)
170
171
172class StreamVerifyView(SSFStreamView):
173
174    def post(self, request: Request, *args, **kwargs):
175        stream = self.get_object()
176        state = request.data.get("state", None)
177        send_ssf_events(
178            EventTypes.SET_VERIFICATION,
179            {
180                "state": state,
181            },
182            stream_filter={"pk": stream.uuid},
183            request=request,
184            sub_id={"format": "opaque", "id": str(stream.uuid)},
185        )
186        return Response(status=204)
187
188
189class StreamStatusView(SSFStreamView):
190
191    class StreamStatusSerializer(PassiveSerializer):
192        stream_id = CharField()
193        status = ChoiceField(choices=StreamStatus.choices)
194
195    def get(self, request: Request, *args, **kwargs):
196        stream = self.get_object()
197        return Response(
198            {
199                "stream_id": str(stream.pk),
200                "status": str(stream.status),
201            }
202        )
203
204    def post(self, request: Request, *args, **kwargs):
205        stream = self.get_object()
206        serializer = self.StreamStatusSerializer(stream, data=request.data)
207        serializer.is_valid(raise_exception=True)
208        stream.status = serializer.validated_data["status"]
209        stream.save()
210        return Response(
211            {
212                "stream_id": str(stream.pk),
213                "status": str(stream.status),
214            }
215        )
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class StreamDeliverySerializer(authentik.core.api.utils.PassiveSerializer):
27class StreamDeliverySerializer(PassiveSerializer):
28    method = ChoiceField(choices=[(x.value, x.value) for x in DeliveryMethods])
29    endpoint_url = CharField(required=False)
30    authorization_header = CharField(required=False)
31
32    def validate_method(self, method: DeliveryMethods):
33        """Currently only push is supported"""
34        if method == DeliveryMethods.RISC_POLL:
35            raise ValidationError("Polling for SSF events is not currently supported.")
36        return method
37
38    def validate(self, attrs: dict) -> dict:
39        if attrs.get("method") in [DeliveryMethods.RISC_PUSH, DeliveryMethods.RFC_PUSH]:
40            if not attrs.get("endpoint_url"):
41                raise ValidationError("Endpoint URL is required when using push.")
42        return attrs

Base serializer class which doesn't implement create/update methods

method
endpoint_url
authorization_header
def validate_method( self, method: authentik.enterprise.providers.ssf.models.DeliveryMethods):
32    def validate_method(self, method: DeliveryMethods):
33        """Currently only push is supported"""
34        if method == DeliveryMethods.RISC_POLL:
35            raise ValidationError("Polling for SSF events is not currently supported.")
36        return method

Currently only push is supported

def validate(self, attrs: dict) -> dict:
38    def validate(self, attrs: dict) -> dict:
39        if attrs.get("method") in [DeliveryMethods.RISC_PUSH, DeliveryMethods.RFC_PUSH]:
40            if not attrs.get("endpoint_url"):
41                raise ValidationError("Endpoint URL is required when using push.")
42        return attrs
class StreamSerializer(authentik.core.api.utils.ModelSerializer):
45class StreamSerializer(ModelSerializer):
46    delivery = StreamDeliverySerializer()
47    events_requested = ListField(
48        child=ChoiceField(choices=[(x.value, x.value) for x in EventTypes])
49    )
50    format = CharField(default="iss_sub")
51    aud = ListField(child=CharField(), allow_empty=True, default=list)
52
53    def create(self, validated_data):
54        provider: SSFProvider = validated_data["provider"]
55        request: HttpRequest = self.context["request"]
56        iss = request.build_absolute_uri(
57            reverse(
58                "authentik_providers_ssf:configuration",
59                kwargs={
60                    "application_slug": provider.backchannel_application.slug,
61                },
62            )
63        )
64        # Ensure that streams always get SET verification events sent to them
65        validated_data["events_requested"].append(EventTypes.SET_VERIFICATION)
66        stream_id = uuid4()
67        default_aud = f"goauthentik.io/providers/ssf/{str(stream_id)}"
68        return super().create(
69            {
70                "delivery_method": validated_data["delivery"]["method"],
71                "endpoint_url": validated_data["delivery"].get("endpoint_url"),
72                "authorization_header": validated_data["delivery"].get("authorization_header"),
73                "format": validated_data["format"],
74                "provider": validated_data["provider"],
75                "events_requested": validated_data["events_requested"],
76                "aud": validated_data["aud"] or [default_aud],
77                "iss": iss,
78                "pk": stream_id,
79            }
80        )
81
82    class Meta:
83        model = Stream
84        fields = [
85            "delivery",
86            "events_requested",
87            "format",
88            "aud",
89        ]

A ModelSerializer is just a regular Serializer, except that:

  • A set of default fields are automatically populated.
  • A set of default validators are automatically populated.
  • Default .create() and .update() implementations are provided.

The process of automatically determining a set of serializer fields based on the model fields is reasonably complex, but you almost certainly don't need to dig into the implementation.

If the ModelSerializer class doesn't generate the set of fields that you need you should either declare the extra/differing fields explicitly on the serializer class, or simply use a Serializer class.

delivery
events_requested
format
aud
def create(self, validated_data):
53    def create(self, validated_data):
54        provider: SSFProvider = validated_data["provider"]
55        request: HttpRequest = self.context["request"]
56        iss = request.build_absolute_uri(
57            reverse(
58                "authentik_providers_ssf:configuration",
59                kwargs={
60                    "application_slug": provider.backchannel_application.slug,
61                },
62            )
63        )
64        # Ensure that streams always get SET verification events sent to them
65        validated_data["events_requested"].append(EventTypes.SET_VERIFICATION)
66        stream_id = uuid4()
67        default_aud = f"goauthentik.io/providers/ssf/{str(stream_id)}"
68        return super().create(
69            {
70                "delivery_method": validated_data["delivery"]["method"],
71                "endpoint_url": validated_data["delivery"].get("endpoint_url"),
72                "authorization_header": validated_data["delivery"].get("authorization_header"),
73                "format": validated_data["format"],
74                "provider": validated_data["provider"],
75                "events_requested": validated_data["events_requested"],
76                "aud": validated_data["aud"] or [default_aud],
77                "iss": iss,
78                "pk": stream_id,
79            }
80        )

We have a bit of extra checking around this in order to provide descriptive messages when something goes wrong, but this method is essentially just:

return ExampleModel.objects.create(**validated_data)

If there are many to many fields present on the instance then they cannot be set until the model is instantiated, in which case the implementation is like so:

example_relationship = validated_data.pop('example_relationship')
instance = ExampleModel.objects.create(**validated_data)
instance.example_relationship = example_relationship
return instance

The default implementation also does not handle nested relationships. If you want to support writable nested relationships you'll need to write an explicit .create() method.

class StreamSerializer.Meta:
82    class Meta:
83        model = Stream
84        fields = [
85            "delivery",
86            "events_requested",
87            "format",
88            "aud",
89        ]
fields = ['delivery', 'events_requested', 'format', 'aud']
class StreamResponseSerializer(authentik.core.api.utils.PassiveSerializer):
 92class StreamResponseSerializer(PassiveSerializer):
 93    stream_id = CharField(source="pk")
 94    iss = CharField()
 95    aud = ListField(child=CharField())
 96    delivery = SerializerMethodField()
 97    format = CharField()
 98
 99    events_requested = ListField(child=CharField())
100    events_supported = SerializerMethodField()
101    events_delivered = ListField(child=CharField(), source="events_requested")
102
103    def get_delivery(self, instance: Stream) -> StreamDeliverySerializer:
104        return {
105            "method": instance.delivery_method,
106            "endpoint_url": instance.endpoint_url,
107        }
108
109    def get_events_supported(self, instance: Stream) -> list[str]:
110        return [
111            EventTypes.CAEP_SESSION_REVOKED,
112            EventTypes.CAEP_CREDENTIAL_CHANGE,
113            EventTypes.SET_VERIFICATION,
114        ]

Base serializer class which doesn't implement create/update methods

stream_id
iss
aud
delivery
format
events_requested
events_supported
events_delivered
def get_delivery( self, instance: authentik.enterprise.providers.ssf.models.Stream) -> StreamDeliverySerializer:
103    def get_delivery(self, instance: Stream) -> StreamDeliverySerializer:
104        return {
105            "method": instance.delivery_method,
106            "endpoint_url": instance.endpoint_url,
107        }
def get_events_supported( self, instance: authentik.enterprise.providers.ssf.models.Stream) -> list[str]:
109    def get_events_supported(self, instance: Stream) -> list[str]:
110        return [
111            EventTypes.CAEP_SESSION_REVOKED,
112            EventTypes.CAEP_CREDENTIAL_CHANGE,
113            EventTypes.SET_VERIFICATION,
114        ]
117class StreamView(SSFStreamView):
118
119    def get(self, request: Request, *args, **kwargs):
120        stream = self.get_object()
121        return Response(
122            StreamResponseSerializer(instance=stream, context={"request": request}).data
123        )
124
125    @validate(StreamSerializer)
126    def post(self, request: Request, *args, body: StreamSerializer, **kwargs) -> Response:
127        if not request.user.has_perm("authentik_providers_ssf.add_stream", self.provider):
128            raise PermissionDenied(
129                "User does not have permission to create stream for this provider."
130            )
131        instance: Stream = body.save(provider=self.provider)
132
133        LOGGER.info("Sending verification event", stream=instance)
134        send_ssf_events(
135            EventTypes.SET_VERIFICATION,
136            {},
137            stream_filter={"pk": instance.uuid},
138            request=request,
139            sub_id={"format": "opaque", "id": str(instance.uuid)},
140        )
141        response = StreamResponseSerializer(instance=instance, context={"request": request}).data
142        return Response(response, status=201)
143
144    def patch(self, request: Request, *args, **kwargs) -> Response:
145        stream = self.get_object()
146        serializer = StreamSerializer(stream, data=request.data, partial=True)
147        serializer.is_valid(raise_exception=True)
148        serializer.save()
149        response = StreamResponseSerializer(
150            instance=serializer.instance, context={"request": request}
151        ).data
152        return Response(response, status=200)
153
154    def put(self, request: Request, *args, **kwargs) -> Response:
155        stream = self.get_object()
156        serializer = StreamSerializer(stream, data=request.data)
157        serializer.is_valid(raise_exception=True)
158        serializer.save()
159        response = StreamResponseSerializer(
160            instance=serializer.instance, context={"request": request}
161        ).data
162        return Response(response, status=200)
163
164    def delete(self, request: Request, *args, **kwargs) -> Response:
165        stream = self.get_object()
166        if stream.status == StreamStatus.DISABLED_DELETED:
167            raise Http404
168        stream.status = StreamStatus.DISABLED_DELETED
169        stream.save()
170        return Response(status=204)

Intentionally simple parent class for all views. Only implements dispatch-by-method and simple sanity checking.

def get(self, request: rest_framework.request.Request, *args, **kwargs):
119    def get(self, request: Request, *args, **kwargs):
120        stream = self.get_object()
121        return Response(
122            StreamResponseSerializer(instance=stream, context={"request": request}).data
123        )
@validate(StreamSerializer)
def post( self, request: rest_framework.request.Request, *args, body: StreamSerializer, **kwargs) -> rest_framework.response.Response:
125    @validate(StreamSerializer)
126    def post(self, request: Request, *args, body: StreamSerializer, **kwargs) -> Response:
127        if not request.user.has_perm("authentik_providers_ssf.add_stream", self.provider):
128            raise PermissionDenied(
129                "User does not have permission to create stream for this provider."
130            )
131        instance: Stream = body.save(provider=self.provider)
132
133        LOGGER.info("Sending verification event", stream=instance)
134        send_ssf_events(
135            EventTypes.SET_VERIFICATION,
136            {},
137            stream_filter={"pk": instance.uuid},
138            request=request,
139            sub_id={"format": "opaque", "id": str(instance.uuid)},
140        )
141        response = StreamResponseSerializer(instance=instance, context={"request": request}).data
142        return Response(response, status=201)
def patch( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
144    def patch(self, request: Request, *args, **kwargs) -> Response:
145        stream = self.get_object()
146        serializer = StreamSerializer(stream, data=request.data, partial=True)
147        serializer.is_valid(raise_exception=True)
148        serializer.save()
149        response = StreamResponseSerializer(
150            instance=serializer.instance, context={"request": request}
151        ).data
152        return Response(response, status=200)
def put( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
154    def put(self, request: Request, *args, **kwargs) -> Response:
155        stream = self.get_object()
156        serializer = StreamSerializer(stream, data=request.data)
157        serializer.is_valid(raise_exception=True)
158        serializer.save()
159        response = StreamResponseSerializer(
160            instance=serializer.instance, context={"request": request}
161        ).data
162        return Response(response, status=200)
def delete( self, request: rest_framework.request.Request, *args, **kwargs) -> rest_framework.response.Response:
164    def delete(self, request: Request, *args, **kwargs) -> Response:
165        stream = self.get_object()
166        if stream.status == StreamStatus.DISABLED_DELETED:
167            raise Http404
168        stream.status = StreamStatus.DISABLED_DELETED
169        stream.save()
170        return Response(status=204)
173class StreamVerifyView(SSFStreamView):
174
175    def post(self, request: Request, *args, **kwargs):
176        stream = self.get_object()
177        state = request.data.get("state", None)
178        send_ssf_events(
179            EventTypes.SET_VERIFICATION,
180            {
181                "state": state,
182            },
183            stream_filter={"pk": stream.uuid},
184            request=request,
185            sub_id={"format": "opaque", "id": str(stream.uuid)},
186        )
187        return Response(status=204)

Intentionally simple parent class for all views. Only implements dispatch-by-method and simple sanity checking.

def post(self, request: rest_framework.request.Request, *args, **kwargs):
175    def post(self, request: Request, *args, **kwargs):
176        stream = self.get_object()
177        state = request.data.get("state", None)
178        send_ssf_events(
179            EventTypes.SET_VERIFICATION,
180            {
181                "state": state,
182            },
183            stream_filter={"pk": stream.uuid},
184            request=request,
185            sub_id={"format": "opaque", "id": str(stream.uuid)},
186        )
187        return Response(status=204)
190class StreamStatusView(SSFStreamView):
191
192    class StreamStatusSerializer(PassiveSerializer):
193        stream_id = CharField()
194        status = ChoiceField(choices=StreamStatus.choices)
195
196    def get(self, request: Request, *args, **kwargs):
197        stream = self.get_object()
198        return Response(
199            {
200                "stream_id": str(stream.pk),
201                "status": str(stream.status),
202            }
203        )
204
205    def post(self, request: Request, *args, **kwargs):
206        stream = self.get_object()
207        serializer = self.StreamStatusSerializer(stream, data=request.data)
208        serializer.is_valid(raise_exception=True)
209        stream.status = serializer.validated_data["status"]
210        stream.save()
211        return Response(
212            {
213                "stream_id": str(stream.pk),
214                "status": str(stream.status),
215            }
216        )

Intentionally simple parent class for all views. Only implements dispatch-by-method and simple sanity checking.

def get(self, request: rest_framework.request.Request, *args, **kwargs):
196    def get(self, request: Request, *args, **kwargs):
197        stream = self.get_object()
198        return Response(
199            {
200                "stream_id": str(stream.pk),
201                "status": str(stream.status),
202            }
203        )
def post(self, request: rest_framework.request.Request, *args, **kwargs):
205    def post(self, request: Request, *args, **kwargs):
206        stream = self.get_object()
207        serializer = self.StreamStatusSerializer(stream, data=request.data)
208        serializer.is_valid(raise_exception=True)
209        stream.status = serializer.validated_data["status"]
210        stream.save()
211        return Response(
212            {
213                "stream_id": str(stream.pk),
214                "status": str(stream.status),
215            }
216        )
class StreamStatusView.StreamStatusSerializer(authentik.core.api.utils.PassiveSerializer):
192    class StreamStatusSerializer(PassiveSerializer):
193        stream_id = CharField()
194        status = ChoiceField(choices=StreamStatus.choices)

Base serializer class which doesn't implement create/update methods

stream_id
status