authentik.outposts.api.outposts

Outpost API Views

  1"""Outpost API Views"""
  2
  3from dacite.core import from_dict
  4from dacite.exceptions import DaciteError
  5from django_filters.filters import ModelMultipleChoiceFilter
  6from django_filters.filterset import FilterSet
  7from drf_spectacular.utils import extend_schema
  8from rest_framework.decorators import action
  9from rest_framework.exceptions import ValidationError
 10from rest_framework.fields import BooleanField, CharField, DateTimeField, SerializerMethodField
 11from rest_framework.relations import PrimaryKeyRelatedField
 12from rest_framework.request import Request
 13from rest_framework.response import Response
 14from rest_framework.viewsets import ModelViewSet
 15
 16from authentik import authentik_build_hash
 17from authentik.admin.api.system import fips_enabled
 18from authentik.core.api.providers import ProviderSerializer
 19from authentik.core.api.used_by import UsedByMixin
 20from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer
 21from authentik.core.models import Provider
 22from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
 23from authentik.outposts.api.service_connections import ServiceConnectionSerializer
 24from authentik.outposts.apps import MANAGED_OUTPOST, MANAGED_OUTPOST_NAME
 25from authentik.outposts.models import (
 26    Outpost,
 27    OutpostConfig,
 28    OutpostType,
 29    default_outpost_config,
 30)
 31from authentik.providers.ldap.models import LDAPProvider
 32from authentik.providers.proxy.models import ProxyProvider
 33from authentik.providers.rac.models import RACProvider
 34from authentik.providers.radius.models import RadiusProvider
 35
 36
 37class OutpostSerializer(ModelSerializer):
 38    """Outpost Serializer"""
 39
 40    config = JSONDictField(source="_config")
 41    # Need to set allow_empty=True for the embedded outpost with no providers
 42    # is checked for other providers in the API Viewset
 43    providers = PrimaryKeyRelatedField(
 44        allow_empty=True,
 45        many=True,
 46        queryset=Provider.objects.select_subclasses().all(),
 47    )
 48    providers_obj = ProviderSerializer(source="providers", many=True, read_only=True)
 49    service_connection_obj = ServiceConnectionSerializer(
 50        source="service_connection",
 51        read_only=True,
 52        allow_null=True,
 53    )
 54    refresh_interval_s = SerializerMethodField()
 55
 56    def get_refresh_interval_s(self, obj: Outpost) -> int:
 57        return int(timedelta_from_string(obj.config.refresh_interval).total_seconds())
 58
 59    def validate_name(self, name: str) -> str:
 60        """Validate name (especially for embedded outpost)"""
 61        if not self.instance:
 62            return name
 63        if self.instance.managed == MANAGED_OUTPOST and name != MANAGED_OUTPOST_NAME:
 64            raise ValidationError("Embedded outpost's name cannot be changed")
 65        if self.instance.name == MANAGED_OUTPOST_NAME:
 66            self.instance.managed = MANAGED_OUTPOST
 67        return name
 68
 69    def validate_providers(self, providers: list[Provider]) -> list[Provider]:
 70        """Check that all providers match the type of the outpost"""
 71        type_map = {
 72            OutpostType.LDAP: LDAPProvider,
 73            OutpostType.PROXY: ProxyProvider,
 74            OutpostType.RADIUS: RadiusProvider,
 75            OutpostType.RAC: RACProvider,
 76            None: Provider,
 77        }
 78        for provider in providers:
 79            if not isinstance(provider, type_map[self.initial_data.get("type")]):
 80                raise ValidationError(
 81                    f"Outpost type {self.initial_data['type']} can't be used with "
 82                    f"{provider.__class__.__name__} providers."
 83                )
 84        if self.instance and self.instance.managed == MANAGED_OUTPOST:
 85            return providers
 86        if len(providers) < 1:
 87            raise ValidationError("This list may not be empty.")
 88        return providers
 89
 90    def validate_config(self, config) -> dict:
 91        """Check that the config has all required fields"""
 92        try:
 93            parsed = from_dict(OutpostConfig, config)
 94            timedelta_string_validator(parsed.refresh_interval)
 95        except DaciteError as exc:
 96            raise ValidationError(f"Failed to validate config: {str(exc)}") from exc
 97        return config
 98
 99    class Meta:
100        model = Outpost
101        fields = [
102            "pk",
103            "name",
104            "type",
105            "providers",
106            "providers_obj",
107            "service_connection",
108            "service_connection_obj",
109            "refresh_interval_s",
110            "token_identifier",
111            "config",
112            "managed",
113        ]
114        extra_kwargs = {"type": {"required": True}}
115
116
117class OutpostDefaultConfigSerializer(PassiveSerializer):
118    """Global default outpost config"""
119
120    config = JSONDictField(read_only=True)
121
122
123class OutpostHealthSerializer(PassiveSerializer):
124    """Outpost health status"""
125
126    uid = CharField(read_only=True)
127    last_seen = DateTimeField(read_only=True)
128    version = CharField(read_only=True)
129    golang_version = CharField(read_only=True)
130    openssl_enabled = BooleanField(read_only=True)
131    openssl_version = CharField(read_only=True)
132    fips_enabled = SerializerMethodField()
133
134    version_should = CharField(read_only=True)
135    version_outdated = BooleanField(read_only=True)
136
137    build_hash = CharField(read_only=True, required=False)
138    build_hash_should = CharField(read_only=True, required=False)
139
140    hostname = CharField(read_only=True, required=False)
141
142    def get_fips_enabled(self, obj: dict) -> bool | None:
143        """Get FIPS enabled"""
144        if not fips_enabled():
145            return None
146        return obj["fips_enabled"]
147
148
149class OutpostFilter(FilterSet):
150    """Filter for Outposts"""
151
152    providers_by_pk = ModelMultipleChoiceFilter(
153        field_name="providers",
154        queryset=Provider.objects.all(),
155    )
156
157    class Meta:
158        model = Outpost
159        fields = {
160            "providers": ["isnull"],
161            "name": ["iexact", "icontains"],
162            "service_connection__name": ["iexact", "icontains"],
163            "managed": ["iexact", "icontains"],
164        }
165
166
167class OutpostViewSet(UsedByMixin, ModelViewSet):
168    """Outpost Viewset"""
169
170    queryset = Outpost.objects.all()
171    serializer_class = OutpostSerializer
172    filterset_class = OutpostFilter
173    search_fields = [
174        "name",
175        "providers__name",
176    ]
177    ordering = ["name", "service_connection__name"]
178
179    @extend_schema(responses={200: OutpostHealthSerializer(many=True)})
180    @action(methods=["GET"], detail=True, pagination_class=None)
181    def health(self, request: Request, pk: int) -> Response:
182        """Get outposts current health"""
183        outpost: Outpost = self.get_object()
184        states = []
185        for state in outpost.state:
186            states.append(
187                {
188                    "uid": state.uid,
189                    "last_seen": state.last_seen,
190                    "version": state.version,
191                    "version_should": state.version_should,
192                    "version_outdated": state.version_outdated,
193                    "build_hash": state.build_hash,
194                    "golang_version": state.golang_version,
195                    "openssl_enabled": state.openssl_enabled,
196                    "openssl_version": state.openssl_version,
197                    "fips_enabled": state.fips_enabled,
198                    "hostname": state.hostname,
199                    "build_hash_should": authentik_build_hash(),
200                }
201            )
202        return Response(OutpostHealthSerializer(states, many=True).data)
203
204    @extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
205    @action(detail=False, methods=["GET"])
206    def default_settings(self, request: Request) -> Response:
207        """Global default outpost config"""
208        host = self.request.build_absolute_uri("/")
209        return Response({"config": default_outpost_config(host)})
class OutpostSerializer(authentik.core.api.utils.ModelSerializer):
 38class OutpostSerializer(ModelSerializer):
 39    """Outpost Serializer"""
 40
 41    config = JSONDictField(source="_config")
 42    # Need to set allow_empty=True for the embedded outpost with no providers
 43    # is checked for other providers in the API Viewset
 44    providers = PrimaryKeyRelatedField(
 45        allow_empty=True,
 46        many=True,
 47        queryset=Provider.objects.select_subclasses().all(),
 48    )
 49    providers_obj = ProviderSerializer(source="providers", many=True, read_only=True)
 50    service_connection_obj = ServiceConnectionSerializer(
 51        source="service_connection",
 52        read_only=True,
 53        allow_null=True,
 54    )
 55    refresh_interval_s = SerializerMethodField()
 56
 57    def get_refresh_interval_s(self, obj: Outpost) -> int:
 58        return int(timedelta_from_string(obj.config.refresh_interval).total_seconds())
 59
 60    def validate_name(self, name: str) -> str:
 61        """Validate name (especially for embedded outpost)"""
 62        if not self.instance:
 63            return name
 64        if self.instance.managed == MANAGED_OUTPOST and name != MANAGED_OUTPOST_NAME:
 65            raise ValidationError("Embedded outpost's name cannot be changed")
 66        if self.instance.name == MANAGED_OUTPOST_NAME:
 67            self.instance.managed = MANAGED_OUTPOST
 68        return name
 69
 70    def validate_providers(self, providers: list[Provider]) -> list[Provider]:
 71        """Check that all providers match the type of the outpost"""
 72        type_map = {
 73            OutpostType.LDAP: LDAPProvider,
 74            OutpostType.PROXY: ProxyProvider,
 75            OutpostType.RADIUS: RadiusProvider,
 76            OutpostType.RAC: RACProvider,
 77            None: Provider,
 78        }
 79        for provider in providers:
 80            if not isinstance(provider, type_map[self.initial_data.get("type")]):
 81                raise ValidationError(
 82                    f"Outpost type {self.initial_data['type']} can't be used with "
 83                    f"{provider.__class__.__name__} providers."
 84                )
 85        if self.instance and self.instance.managed == MANAGED_OUTPOST:
 86            return providers
 87        if len(providers) < 1:
 88            raise ValidationError("This list may not be empty.")
 89        return providers
 90
 91    def validate_config(self, config) -> dict:
 92        """Check that the config has all required fields"""
 93        try:
 94            parsed = from_dict(OutpostConfig, config)
 95            timedelta_string_validator(parsed.refresh_interval)
 96        except DaciteError as exc:
 97            raise ValidationError(f"Failed to validate config: {str(exc)}") from exc
 98        return config
 99
100    class Meta:
101        model = Outpost
102        fields = [
103            "pk",
104            "name",
105            "type",
106            "providers",
107            "providers_obj",
108            "service_connection",
109            "service_connection_obj",
110            "refresh_interval_s",
111            "token_identifier",
112            "config",
113            "managed",
114        ]
115        extra_kwargs = {"type": {"required": True}}

Outpost Serializer

config
providers
providers_obj
service_connection_obj
refresh_interval_s
def get_refresh_interval_s(self, obj: authentik.outposts.models.Outpost) -> int:
57    def get_refresh_interval_s(self, obj: Outpost) -> int:
58        return int(timedelta_from_string(obj.config.refresh_interval).total_seconds())
def validate_name(self, name: str) -> str:
60    def validate_name(self, name: str) -> str:
61        """Validate name (especially for embedded outpost)"""
62        if not self.instance:
63            return name
64        if self.instance.managed == MANAGED_OUTPOST and name != MANAGED_OUTPOST_NAME:
65            raise ValidationError("Embedded outpost's name cannot be changed")
66        if self.instance.name == MANAGED_OUTPOST_NAME:
67            self.instance.managed = MANAGED_OUTPOST
68        return name

Validate name (especially for embedded outpost)

def validate_providers( self, providers: list[authentik.core.models.Provider]) -> list[authentik.core.models.Provider]:
70    def validate_providers(self, providers: list[Provider]) -> list[Provider]:
71        """Check that all providers match the type of the outpost"""
72        type_map = {
73            OutpostType.LDAP: LDAPProvider,
74            OutpostType.PROXY: ProxyProvider,
75            OutpostType.RADIUS: RadiusProvider,
76            OutpostType.RAC: RACProvider,
77            None: Provider,
78        }
79        for provider in providers:
80            if not isinstance(provider, type_map[self.initial_data.get("type")]):
81                raise ValidationError(
82                    f"Outpost type {self.initial_data['type']} can't be used with "
83                    f"{provider.__class__.__name__} providers."
84                )
85        if self.instance and self.instance.managed == MANAGED_OUTPOST:
86            return providers
87        if len(providers) < 1:
88            raise ValidationError("This list may not be empty.")
89        return providers

Check that all providers match the type of the outpost

def validate_config(self, config) -> dict:
91    def validate_config(self, config) -> dict:
92        """Check that the config has all required fields"""
93        try:
94            parsed = from_dict(OutpostConfig, config)
95            timedelta_string_validator(parsed.refresh_interval)
96        except DaciteError as exc:
97            raise ValidationError(f"Failed to validate config: {str(exc)}") from exc
98        return config

Check that the config has all required fields

class OutpostSerializer.Meta:
100    class Meta:
101        model = Outpost
102        fields = [
103            "pk",
104            "name",
105            "type",
106            "providers",
107            "providers_obj",
108            "service_connection",
109            "service_connection_obj",
110            "refresh_interval_s",
111            "token_identifier",
112            "config",
113            "managed",
114        ]
115        extra_kwargs = {"type": {"required": True}}
fields = ['pk', 'name', 'type', 'providers', 'providers_obj', 'service_connection', 'service_connection_obj', 'refresh_interval_s', 'token_identifier', 'config', 'managed']
extra_kwargs = {'type': {'required': True}}
class OutpostDefaultConfigSerializer(authentik.core.api.utils.PassiveSerializer):
118class OutpostDefaultConfigSerializer(PassiveSerializer):
119    """Global default outpost config"""
120
121    config = JSONDictField(read_only=True)

Global default outpost config

config
class OutpostHealthSerializer(authentik.core.api.utils.PassiveSerializer):
124class OutpostHealthSerializer(PassiveSerializer):
125    """Outpost health status"""
126
127    uid = CharField(read_only=True)
128    last_seen = DateTimeField(read_only=True)
129    version = CharField(read_only=True)
130    golang_version = CharField(read_only=True)
131    openssl_enabled = BooleanField(read_only=True)
132    openssl_version = CharField(read_only=True)
133    fips_enabled = SerializerMethodField()
134
135    version_should = CharField(read_only=True)
136    version_outdated = BooleanField(read_only=True)
137
138    build_hash = CharField(read_only=True, required=False)
139    build_hash_should = CharField(read_only=True, required=False)
140
141    hostname = CharField(read_only=True, required=False)
142
143    def get_fips_enabled(self, obj: dict) -> bool | None:
144        """Get FIPS enabled"""
145        if not fips_enabled():
146            return None
147        return obj["fips_enabled"]

Outpost health status

uid
last_seen
version
golang_version
openssl_enabled
openssl_version
fips_enabled
version_should
version_outdated
build_hash
build_hash_should
hostname
def get_fips_enabled(self, obj: dict) -> bool | None:
143    def get_fips_enabled(self, obj: dict) -> bool | None:
144        """Get FIPS enabled"""
145        if not fips_enabled():
146            return None
147        return obj["fips_enabled"]

Get FIPS enabled

class OutpostFilter(django_filters.filterset.FilterSet):
150class OutpostFilter(FilterSet):
151    """Filter for Outposts"""
152
153    providers_by_pk = ModelMultipleChoiceFilter(
154        field_name="providers",
155        queryset=Provider.objects.all(),
156    )
157
158    class Meta:
159        model = Outpost
160        fields = {
161            "providers": ["isnull"],
162            "name": ["iexact", "icontains"],
163            "service_connection__name": ["iexact", "icontains"],
164            "managed": ["iexact", "icontains"],
165        }

Filter for Outposts

providers_by_pk
declared_filters = OrderedDict({'providers_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>})
base_filters = OrderedDict({'providers__isnull': <django_filters.filters.BooleanFilter object>, 'name__iexact': <django_filters.filters.CharFilter object>, 'name__icontains': <django_filters.filters.CharFilter object>, 'service_connection__name__iexact': <django_filters.filters.CharFilter object>, 'service_connection__name__icontains': <django_filters.filters.CharFilter object>, 'managed__iexact': <django_filters.filters.CharFilter object>, 'managed__icontains': <django_filters.filters.CharFilter object>, 'providers_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>})
class OutpostFilter.Meta:
158    class Meta:
159        model = Outpost
160        fields = {
161            "providers": ["isnull"],
162            "name": ["iexact", "icontains"],
163            "service_connection__name": ["iexact", "icontains"],
164            "managed": ["iexact", "icontains"],
165        }
fields = {'providers': ['isnull'], 'name': ['iexact', 'icontains'], 'service_connection__name': ['iexact', 'icontains'], 'managed': ['iexact', 'icontains']}
class OutpostViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
168class OutpostViewSet(UsedByMixin, ModelViewSet):
169    """Outpost Viewset"""
170
171    queryset = Outpost.objects.all()
172    serializer_class = OutpostSerializer
173    filterset_class = OutpostFilter
174    search_fields = [
175        "name",
176        "providers__name",
177    ]
178    ordering = ["name", "service_connection__name"]
179
180    @extend_schema(responses={200: OutpostHealthSerializer(many=True)})
181    @action(methods=["GET"], detail=True, pagination_class=None)
182    def health(self, request: Request, pk: int) -> Response:
183        """Get outposts current health"""
184        outpost: Outpost = self.get_object()
185        states = []
186        for state in outpost.state:
187            states.append(
188                {
189                    "uid": state.uid,
190                    "last_seen": state.last_seen,
191                    "version": state.version,
192                    "version_should": state.version_should,
193                    "version_outdated": state.version_outdated,
194                    "build_hash": state.build_hash,
195                    "golang_version": state.golang_version,
196                    "openssl_enabled": state.openssl_enabled,
197                    "openssl_version": state.openssl_version,
198                    "fips_enabled": state.fips_enabled,
199                    "hostname": state.hostname,
200                    "build_hash_should": authentik_build_hash(),
201                }
202            )
203        return Response(OutpostHealthSerializer(states, many=True).data)
204
205    @extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
206    @action(detail=False, methods=["GET"])
207    def default_settings(self, request: Request) -> Response:
208        """Global default outpost config"""
209        host = self.request.build_absolute_uri("/")
210        return Response({"config": default_outpost_config(host)})

Outpost Viewset

queryset = <QuerySet []>
serializer_class = <class 'OutpostSerializer'>
filterset_class = <class 'OutpostFilter'>
search_fields = ['name', 'providers__name']
ordering = ['name', 'service_connection__name']
@extend_schema(responses={200: OutpostHealthSerializer(many=True)})
@action(methods=['GET'], detail=True, pagination_class=None)
def health( self, request: rest_framework.request.Request, pk: int) -> rest_framework.response.Response:
180    @extend_schema(responses={200: OutpostHealthSerializer(many=True)})
181    @action(methods=["GET"], detail=True, pagination_class=None)
182    def health(self, request: Request, pk: int) -> Response:
183        """Get outposts current health"""
184        outpost: Outpost = self.get_object()
185        states = []
186        for state in outpost.state:
187            states.append(
188                {
189                    "uid": state.uid,
190                    "last_seen": state.last_seen,
191                    "version": state.version,
192                    "version_should": state.version_should,
193                    "version_outdated": state.version_outdated,
194                    "build_hash": state.build_hash,
195                    "golang_version": state.golang_version,
196                    "openssl_enabled": state.openssl_enabled,
197                    "openssl_version": state.openssl_version,
198                    "fips_enabled": state.fips_enabled,
199                    "hostname": state.hostname,
200                    "build_hash_should": authentik_build_hash(),
201                }
202            )
203        return Response(OutpostHealthSerializer(states, many=True).data)

Get outposts current health

@extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
@action(detail=False, methods=['GET'])
def default_settings( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
205    @extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
206    @action(detail=False, methods=["GET"])
207    def default_settings(self, request: Request) -> Response:
208        """Global default outpost config"""
209        host = self.request.build_absolute_uri("/")
210        return Response({"config": default_outpost_config(host)})

Global default outpost config

name = None
description = None
suffix = None
detail = None
basename = None