authentik.enterprise.providers.ssf.models

  1from datetime import datetime
  2from functools import cached_property
  3from uuid import uuid4
  4
  5from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePrivateKey
  6from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey
  7from cryptography.hazmat.primitives.asymmetric.types import PrivateKeyTypes
  8from django.contrib.postgres.fields import ArrayField
  9from django.db import models
 10from django.templatetags.static import static
 11from django.utils.timezone import now
 12from django.utils.translation import gettext_lazy as _
 13from jwt import encode
 14
 15from authentik.core.models import BackchannelProvider, ExpiringModel, Token
 16from authentik.crypto.models import CertificateKeyPair
 17from authentik.lib.models import CreatedUpdatedModel, InternallyManagedMixin
 18from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
 19from authentik.providers.oauth2.models import JWTAlgorithms, OAuth2Provider
 20from authentik.tasks.models import TasksModel
 21
 22
 23class EventTypes(models.TextChoices):
 24    """SSF Event types supported by authentik"""
 25
 26    CAEP_SESSION_REVOKED = "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
 27    CAEP_CREDENTIAL_CHANGE = "https://schemas.openid.net/secevent/caep/event-type/credential-change"
 28    SET_VERIFICATION = "https://schemas.openid.net/secevent/ssf/event-type/verification"
 29
 30
 31class DeliveryMethods(models.TextChoices):
 32    """SSF Delivery methods"""
 33
 34    RISC_PUSH = "https://schemas.openid.net/secevent/risc/delivery-method/push"
 35    RISC_POLL = "https://schemas.openid.net/secevent/risc/delivery-method/poll"
 36    RFC_PUSH = "urn:ietf:rfc:8935", _("SSF RFC Push")
 37    RFC_PULL = "urn:ietf:rfc:8936", _("SSF RFC Pull")
 38
 39
 40class SSFEventStatus(models.TextChoices):
 41    """SSF Event status"""
 42
 43    PENDING_NEW = "pending_new"
 44    PENDING_FAILED = "pending_failed"
 45    SENT = "sent"
 46
 47
 48class StreamStatus(models.TextChoices):
 49
 50    ENABLED = "enabled"
 51    PAUSED = "paused"
 52    DISABLED = "disabled"
 53
 54
 55class SSFProvider(TasksModel, BackchannelProvider):
 56    """Shared Signals Framework provider to allow applications to
 57    receive user events from authentik."""
 58
 59    signing_key = models.ForeignKey(
 60        CertificateKeyPair,
 61        verbose_name=_("Signing Key"),
 62        on_delete=models.CASCADE,
 63        help_text=_("Key used to sign the SSF Events."),
 64    )
 65
 66    push_verify_certificates = models.BooleanField(default=True)
 67
 68    oidc_auth_providers = models.ManyToManyField(OAuth2Provider, blank=True, default=None)
 69
 70    token = models.ForeignKey(Token, on_delete=models.CASCADE, null=True, default=None)
 71
 72    event_retention = models.TextField(
 73        default="days=30",
 74        validators=[timedelta_string_validator],
 75    )
 76
 77    @cached_property
 78    def jwt_key(self) -> tuple[PrivateKeyTypes, str]:
 79        """Get either the configured certificate or the client secret"""
 80        key: CertificateKeyPair = self.signing_key
 81        private_key = key.private_key
 82        if isinstance(private_key, RSAPrivateKey):
 83            return private_key, JWTAlgorithms.RS256
 84        if isinstance(private_key, EllipticCurvePrivateKey):
 85            return private_key, JWTAlgorithms.ES256
 86        raise ValueError(f"Invalid private key type: {type(private_key)}")
 87
 88    @property
 89    def service_account_identifier(self) -> str:
 90        return f"ak-providers-ssf-{self.pk}"
 91
 92    @property
 93    def serializer(self):
 94        from authentik.enterprise.providers.ssf.api.providers import SSFProviderSerializer
 95
 96        return SSFProviderSerializer
 97
 98    @property
 99    def icon_url(self) -> str | None:
100        return static("authentik/sources/ssf.svg")
101
102    @property
103    def component(self) -> str:
104        return "ak-provider-ssf-form"
105
106    class Meta:
107        verbose_name = _("Shared Signals Framework Provider")
108        verbose_name_plural = _("Shared Signals Framework Providers")
109        permissions = [
110            # This overrides the default "add_stream" permission of the Stream object,
111            # as the user requesting to add a stream must have the permission on the provider
112            ("add_stream", _("Add stream to SSF provider")),
113        ]
114
115
116class Stream(models.Model):
117    """SSF Stream"""
118
119    uuid = models.UUIDField(default=uuid4, primary_key=True, editable=False)
120
121    status = models.TextField(choices=StreamStatus.choices, default=StreamStatus.ENABLED)
122
123    provider = models.ForeignKey(SSFProvider, on_delete=models.CASCADE)
124
125    delivery_method = models.TextField(choices=DeliveryMethods.choices)
126    endpoint_url = models.TextField(null=True)
127    authorization_header = models.TextField(null=True, default=None)
128
129    events_requested = ArrayField(models.TextField(choices=EventTypes.choices), default=list)
130    format = models.TextField()
131    aud = ArrayField(models.TextField(), default=list)
132
133    iss = models.TextField()
134
135    class Meta:
136        verbose_name = _("SSF Stream")
137        verbose_name_plural = _("SSF Streams")
138        default_permissions = ["change", "delete", "view"]
139
140    def __str__(self) -> str:
141        return "SSF Stream"
142
143    def prepare_event_payload(self, type: EventTypes, event_data: dict, **kwargs) -> dict:
144        jti = uuid4()
145        _now = now()
146        return {
147            "uuid": jti,
148            "stream_id": str(self.pk),
149            "type": type,
150            "expiring": True,
151            "status": SSFEventStatus.PENDING_NEW,
152            "expires": _now + timedelta_from_string(self.provider.event_retention),
153            "payload": {
154                "jti": jti.hex,
155                "aud": self.aud,
156                "iat": int(datetime.now().timestamp()),
157                "iss": self.iss,
158                "events": {type: event_data},
159                **kwargs,
160            },
161        }
162
163    def encode(self, data: dict) -> str:
164        headers = {"typ": "secevent+jwt"}
165        if self.provider.signing_key:
166            headers["kid"] = self.provider.signing_key.kid
167        key, alg = self.provider.jwt_key
168        return encode(data, key, algorithm=alg, headers=headers)
169
170
171class StreamEvent(InternallyManagedMixin, CreatedUpdatedModel, ExpiringModel):
172    """Single stream event to be sent"""
173
174    uuid = models.UUIDField(default=uuid4, primary_key=True, editable=False)
175
176    stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
177    status = models.TextField(choices=SSFEventStatus.choices)
178
179    type = models.TextField(choices=EventTypes.choices)
180    payload = models.JSONField(default=dict)
181
182    def expire_action(self, *args, **kwargs):
183        """Only allow automatic cleanup of successfully sent event"""
184        if self.status != SSFEventStatus.SENT:
185            return
186        return super().expire_action(*args, **kwargs)
187
188    def __str__(self):
189        return f"Stream event {self.type}"
190
191    class Meta:
192        verbose_name = _("SSF Stream Event")
193        verbose_name_plural = _("SSF Stream Events")
194        ordering = ("-created",)
class EventTypes(django.db.models.enums.TextChoices):
24class EventTypes(models.TextChoices):
25    """SSF Event types supported by authentik"""
26
27    CAEP_SESSION_REVOKED = "https://schemas.openid.net/secevent/caep/event-type/session-revoked"
28    CAEP_CREDENTIAL_CHANGE = "https://schemas.openid.net/secevent/caep/event-type/credential-change"
29    SET_VERIFICATION = "https://schemas.openid.net/secevent/ssf/event-type/verification"

SSF Event types supported by authentik

CAEP_SESSION_REVOKED = EventTypes.CAEP_SESSION_REVOKED
CAEP_CREDENTIAL_CHANGE = EventTypes.CAEP_CREDENTIAL_CHANGE
SET_VERIFICATION = EventTypes.SET_VERIFICATION
class DeliveryMethods(django.db.models.enums.TextChoices):
32class DeliveryMethods(models.TextChoices):
33    """SSF Delivery methods"""
34
35    RISC_PUSH = "https://schemas.openid.net/secevent/risc/delivery-method/push"
36    RISC_POLL = "https://schemas.openid.net/secevent/risc/delivery-method/poll"
37    RFC_PUSH = "urn:ietf:rfc:8935", _("SSF RFC Push")
38    RFC_PULL = "urn:ietf:rfc:8936", _("SSF RFC Pull")

SSF Delivery methods

class SSFEventStatus(django.db.models.enums.TextChoices):
41class SSFEventStatus(models.TextChoices):
42    """SSF Event status"""
43
44    PENDING_NEW = "pending_new"
45    PENDING_FAILED = "pending_failed"
46    SENT = "sent"

SSF Event status

class StreamStatus(django.db.models.enums.TextChoices):
49class StreamStatus(models.TextChoices):
50
51    ENABLED = "enabled"
52    PAUSED = "paused"
53    DISABLED = "disabled"

Class for creating enumerated string choices.

 56class SSFProvider(TasksModel, BackchannelProvider):
 57    """Shared Signals Framework provider to allow applications to
 58    receive user events from authentik."""
 59
 60    signing_key = models.ForeignKey(
 61        CertificateKeyPair,
 62        verbose_name=_("Signing Key"),
 63        on_delete=models.CASCADE,
 64        help_text=_("Key used to sign the SSF Events."),
 65    )
 66
 67    push_verify_certificates = models.BooleanField(default=True)
 68
 69    oidc_auth_providers = models.ManyToManyField(OAuth2Provider, blank=True, default=None)
 70
 71    token = models.ForeignKey(Token, on_delete=models.CASCADE, null=True, default=None)
 72
 73    event_retention = models.TextField(
 74        default="days=30",
 75        validators=[timedelta_string_validator],
 76    )
 77
 78    @cached_property
 79    def jwt_key(self) -> tuple[PrivateKeyTypes, str]:
 80        """Get either the configured certificate or the client secret"""
 81        key: CertificateKeyPair = self.signing_key
 82        private_key = key.private_key
 83        if isinstance(private_key, RSAPrivateKey):
 84            return private_key, JWTAlgorithms.RS256
 85        if isinstance(private_key, EllipticCurvePrivateKey):
 86            return private_key, JWTAlgorithms.ES256
 87        raise ValueError(f"Invalid private key type: {type(private_key)}")
 88
 89    @property
 90    def service_account_identifier(self) -> str:
 91        return f"ak-providers-ssf-{self.pk}"
 92
 93    @property
 94    def serializer(self):
 95        from authentik.enterprise.providers.ssf.api.providers import SSFProviderSerializer
 96
 97        return SSFProviderSerializer
 98
 99    @property
100    def icon_url(self) -> str | None:
101        return static("authentik/sources/ssf.svg")
102
103    @property
104    def component(self) -> str:
105        return "ak-provider-ssf-form"
106
107    class Meta:
108        verbose_name = _("Shared Signals Framework Provider")
109        verbose_name_plural = _("Shared Signals Framework Providers")
110        permissions = [
111            # This overrides the default "add_stream" permission of the Stream object,
112            # as the user requesting to add a stream must have the permission on the provider
113            ("add_stream", _("Add stream to SSF provider")),
114        ]

Shared Signals Framework provider to allow applications to receive user events from authentik.

signing_key

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def push_verify_certificates(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

oidc_auth_providers

Accessor to the related objects manager on the forward and reverse sides of a many-to-many relation.

In the example::

class Pizza(Model):
    toppings = ManyToManyField(Topping, related_name='pizzas')

Pizza.toppings and Topping.pizzas are ManyToManyDescriptor instances.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

token

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def event_retention(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

jwt_key: tuple[cryptography.hazmat.primitives.asymmetric.dh.DHPrivateKey | cryptography.hazmat.primitives.asymmetric.ed25519.Ed25519PrivateKey | cryptography.hazmat.primitives.asymmetric.ed448.Ed448PrivateKey | cryptography.hazmat.primitives.asymmetric.rsa.RSAPrivateKey | cryptography.hazmat.primitives.asymmetric.dsa.DSAPrivateKey | cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePrivateKey | cryptography.hazmat.primitives.asymmetric.x25519.X25519PrivateKey | cryptography.hazmat.primitives.asymmetric.x448.X448PrivateKey, str]
78    @cached_property
79    def jwt_key(self) -> tuple[PrivateKeyTypes, str]:
80        """Get either the configured certificate or the client secret"""
81        key: CertificateKeyPair = self.signing_key
82        private_key = key.private_key
83        if isinstance(private_key, RSAPrivateKey):
84            return private_key, JWTAlgorithms.RS256
85        if isinstance(private_key, EllipticCurvePrivateKey):
86            return private_key, JWTAlgorithms.ES256
87        raise ValueError(f"Invalid private key type: {type(private_key)}")

Get either the configured certificate or the client secret

service_account_identifier: str
89    @property
90    def service_account_identifier(self) -> str:
91        return f"ak-providers-ssf-{self.pk}"
serializer
93    @property
94    def serializer(self):
95        from authentik.enterprise.providers.ssf.api.providers import SSFProviderSerializer
96
97        return SSFProviderSerializer

Get serializer for this model

icon_url: str | None
 99    @property
100    def icon_url(self) -> str | None:
101        return static("authentik/sources/ssf.svg")
component: str
103    @property
104    def component(self) -> str:
105        return "ak-provider-ssf-form"

Return component used to edit this object

tasks

Accessor to the related objects manager on the one-to-many relation created by GenericRelation.

In the example::

class Post(Model):
    comments = GenericRelation(Comment)

post.comments is a ReverseGenericManyToOneDescriptor instance.

signing_key_id
token_id
provider_ptr_id
provider_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

stream_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class SSFProvider.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class SSFProvider.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class Stream(django.db.models.base.Model):
117class Stream(models.Model):
118    """SSF Stream"""
119
120    uuid = models.UUIDField(default=uuid4, primary_key=True, editable=False)
121
122    status = models.TextField(choices=StreamStatus.choices, default=StreamStatus.ENABLED)
123
124    provider = models.ForeignKey(SSFProvider, on_delete=models.CASCADE)
125
126    delivery_method = models.TextField(choices=DeliveryMethods.choices)
127    endpoint_url = models.TextField(null=True)
128    authorization_header = models.TextField(null=True, default=None)
129
130    events_requested = ArrayField(models.TextField(choices=EventTypes.choices), default=list)
131    format = models.TextField()
132    aud = ArrayField(models.TextField(), default=list)
133
134    iss = models.TextField()
135
136    class Meta:
137        verbose_name = _("SSF Stream")
138        verbose_name_plural = _("SSF Streams")
139        default_permissions = ["change", "delete", "view"]
140
141    def __str__(self) -> str:
142        return "SSF Stream"
143
144    def prepare_event_payload(self, type: EventTypes, event_data: dict, **kwargs) -> dict:
145        jti = uuid4()
146        _now = now()
147        return {
148            "uuid": jti,
149            "stream_id": str(self.pk),
150            "type": type,
151            "expiring": True,
152            "status": SSFEventStatus.PENDING_NEW,
153            "expires": _now + timedelta_from_string(self.provider.event_retention),
154            "payload": {
155                "jti": jti.hex,
156                "aud": self.aud,
157                "iat": int(datetime.now().timestamp()),
158                "iss": self.iss,
159                "events": {type: event_data},
160                **kwargs,
161            },
162        }
163
164    def encode(self, data: dict) -> str:
165        headers = {"typ": "secevent+jwt"}
166        if self.provider.signing_key:
167            headers["kid"] = self.provider.signing_key.kid
168        key, alg = self.provider.jwt_key
169        return encode(data, key, algorithm=alg, headers=headers)

SSF Stream

def uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def status(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

provider

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def delivery_method(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def endpoint_url(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def authorization_header(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def events_requested(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def format(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def aud(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def iss(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def prepare_event_payload( self, type: EventTypes, event_data: dict, **kwargs) -> dict:
144    def prepare_event_payload(self, type: EventTypes, event_data: dict, **kwargs) -> dict:
145        jti = uuid4()
146        _now = now()
147        return {
148            "uuid": jti,
149            "stream_id": str(self.pk),
150            "type": type,
151            "expiring": True,
152            "status": SSFEventStatus.PENDING_NEW,
153            "expires": _now + timedelta_from_string(self.provider.event_retention),
154            "payload": {
155                "jti": jti.hex,
156                "aud": self.aud,
157                "iat": int(datetime.now().timestamp()),
158                "iss": self.iss,
159                "events": {type: event_data},
160                **kwargs,
161            },
162        }
def encode(self, data: dict) -> str:
164    def encode(self, data: dict) -> str:
165        headers = {"typ": "secevent+jwt"}
166        if self.provider.signing_key:
167            headers["kid"] = self.provider.signing_key.kid
168        key, alg = self.provider.jwt_key
169        return encode(data, key, algorithm=alg, headers=headers)
def get_status_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

provider_id
def get_delivery_method_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def objects(unknown):

The type of the None singleton.

streamevent_set

Accessor to the related objects manager on the reverse side of a many-to-one relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Parent.children is a ReverseManyToOneDescriptor instance.

Most of the implementation is delegated to a dynamically defined manager class built by create_forward_many_to_many_manager() defined below.

class Stream.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Stream.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

172class StreamEvent(InternallyManagedMixin, CreatedUpdatedModel, ExpiringModel):
173    """Single stream event to be sent"""
174
175    uuid = models.UUIDField(default=uuid4, primary_key=True, editable=False)
176
177    stream = models.ForeignKey(Stream, on_delete=models.CASCADE)
178    status = models.TextField(choices=SSFEventStatus.choices)
179
180    type = models.TextField(choices=EventTypes.choices)
181    payload = models.JSONField(default=dict)
182
183    def expire_action(self, *args, **kwargs):
184        """Only allow automatic cleanup of successfully sent event"""
185        if self.status != SSFEventStatus.SENT:
186            return
187        return super().expire_action(*args, **kwargs)
188
189    def __str__(self):
190        return f"Stream event {self.type}"
191
192    class Meta:
193        verbose_name = _("SSF Stream Event")
194        verbose_name_plural = _("SSF Stream Events")
195        ordering = ("-created",)

Single stream event to be sent

def uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

stream

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def status(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def type(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def payload(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expire_action(self, *args, **kwargs):
183    def expire_action(self, *args, **kwargs):
184        """Only allow automatic cleanup of successfully sent event"""
185        if self.status != SSFEventStatus.SENT:
186            return
187        return super().expire_action(*args, **kwargs)

Only allow automatic cleanup of successfully sent event

def created(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def last_updated(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

stream_id
def get_status_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_type_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_last_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

class StreamEvent.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class StreamEvent.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.