authentik.outposts.api.outposts

Outpost API Views

  1"""Outpost API Views"""
  2
  3from dacite.core import from_dict
  4from dacite.exceptions import DaciteError
  5from django_filters.filters import ModelMultipleChoiceFilter
  6from django_filters.filterset import FilterSet
  7from drf_spectacular.utils import extend_schema
  8from rest_framework.decorators import action
  9from rest_framework.exceptions import ValidationError
 10from rest_framework.fields import BooleanField, CharField, DateTimeField, SerializerMethodField
 11from rest_framework.relations import PrimaryKeyRelatedField
 12from rest_framework.request import Request
 13from rest_framework.response import Response
 14from rest_framework.viewsets import ModelViewSet
 15
 16from authentik import authentik_build_hash
 17from authentik.admin.api.system import fips_enabled
 18from authentik.core.api.providers import ProviderSerializer
 19from authentik.core.api.used_by import UsedByMixin
 20from authentik.core.api.utils import JSONDictField, ModelSerializer, PassiveSerializer
 21from authentik.core.models import Provider
 22from authentik.lib.utils.time import timedelta_from_string, timedelta_string_validator
 23from authentik.outposts.api.service_connections import ServiceConnectionSerializer
 24from authentik.outposts.apps import MANAGED_OUTPOST, MANAGED_OUTPOST_NAME
 25from authentik.outposts.models import (
 26    OUR_VERSION,
 27    Outpost,
 28    OutpostConfig,
 29    OutpostType,
 30    default_outpost_config,
 31)
 32from authentik.providers.ldap.models import LDAPProvider
 33from authentik.providers.proxy.models import ProxyProvider
 34from authentik.providers.rac.models import RACProvider
 35from authentik.providers.radius.models import RadiusProvider
 36
 37
 38class OutpostSerializer(ModelSerializer):
 39    """Outpost Serializer"""
 40
 41    config = JSONDictField(source="_config")
 42    # Need to set allow_empty=True for the embedded outpost with no providers
 43    # is checked for other providers in the API Viewset
 44    providers = PrimaryKeyRelatedField(
 45        allow_empty=True,
 46        many=True,
 47        queryset=Provider.objects.select_subclasses().all(),
 48    )
 49    providers_obj = ProviderSerializer(source="providers", many=True, read_only=True)
 50    service_connection_obj = ServiceConnectionSerializer(
 51        source="service_connection",
 52        read_only=True,
 53        allow_null=True,
 54    )
 55    refresh_interval_s = SerializerMethodField()
 56
 57    def get_refresh_interval_s(self, obj: Outpost) -> int:
 58        return int(timedelta_from_string(obj.config.refresh_interval).total_seconds())
 59
 60    def validate_name(self, name: str) -> str:
 61        """Validate name (especially for embedded outpost)"""
 62        if not self.instance:
 63            return name
 64        if self.instance.managed == MANAGED_OUTPOST and name != MANAGED_OUTPOST_NAME:
 65            raise ValidationError("Embedded outpost's name cannot be changed")
 66        if self.instance.name == MANAGED_OUTPOST_NAME:
 67            self.instance.managed = MANAGED_OUTPOST
 68        return name
 69
 70    def validate_providers(self, providers: list[Provider]) -> list[Provider]:
 71        """Check that all providers match the type of the outpost"""
 72        type_map = {
 73            OutpostType.LDAP: LDAPProvider,
 74            OutpostType.PROXY: ProxyProvider,
 75            OutpostType.RADIUS: RadiusProvider,
 76            OutpostType.RAC: RACProvider,
 77            None: Provider,
 78        }
 79        for provider in providers:
 80            if not isinstance(provider, type_map[self.initial_data.get("type")]):
 81                raise ValidationError(
 82                    f"Outpost type {self.initial_data['type']} can't be used with "
 83                    f"{provider.__class__.__name__} providers."
 84                )
 85        if self.instance and self.instance.managed == MANAGED_OUTPOST:
 86            return providers
 87        if len(providers) < 1:
 88            raise ValidationError("This list may not be empty.")
 89        return providers
 90
 91    def validate_config(self, config) -> dict:
 92        """Check that the config has all required fields"""
 93        try:
 94            parsed = from_dict(OutpostConfig, config)
 95            timedelta_string_validator(parsed.refresh_interval)
 96        except DaciteError as exc:
 97            raise ValidationError(f"Failed to validate config: {str(exc)}") from exc
 98        return config
 99
100    class Meta:
101        model = Outpost
102        fields = [
103            "pk",
104            "name",
105            "type",
106            "providers",
107            "providers_obj",
108            "service_connection",
109            "service_connection_obj",
110            "refresh_interval_s",
111            "token_identifier",
112            "config",
113            "managed",
114        ]
115        extra_kwargs = {"type": {"required": True}}
116
117
118class OutpostDefaultConfigSerializer(PassiveSerializer):
119    """Global default outpost config"""
120
121    config = JSONDictField(read_only=True)
122
123
124class OutpostHealthSerializer(PassiveSerializer):
125    """Outpost health status"""
126
127    uid = CharField(read_only=True)
128    last_seen = DateTimeField(read_only=True)
129    version = CharField(read_only=True)
130    golang_version = CharField(read_only=True)
131    openssl_enabled = BooleanField(read_only=True)
132    openssl_version = CharField(read_only=True)
133    fips_enabled = SerializerMethodField()
134
135    version_should = CharField(read_only=True)
136    version_outdated = BooleanField(read_only=True)
137
138    build_hash = CharField(read_only=True, required=False)
139    build_hash_should = CharField(read_only=True, required=False)
140
141    hostname = CharField(read_only=True, required=False)
142
143    def get_fips_enabled(self, obj: dict) -> bool | None:
144        """Get FIPS enabled"""
145        if not fips_enabled():
146            return None
147        return obj["fips_enabled"]
148
149
150class OutpostFilter(FilterSet):
151    """Filter for Outposts"""
152
153    providers_by_pk = ModelMultipleChoiceFilter(
154        field_name="providers",
155        queryset=Provider.objects.all(),
156    )
157
158    class Meta:
159        model = Outpost
160        fields = {
161            "providers": ["isnull"],
162            "name": ["iexact", "icontains"],
163            "service_connection__name": ["iexact", "icontains"],
164            "managed": ["iexact", "icontains"],
165        }
166
167
168class OutpostViewSet(UsedByMixin, ModelViewSet):
169    """Outpost Viewset"""
170
171    queryset = Outpost.objects.all()
172    serializer_class = OutpostSerializer
173    filterset_class = OutpostFilter
174    search_fields = [
175        "name",
176        "providers__name",
177    ]
178    ordering = ["name", "service_connection__name"]
179
180    @extend_schema(responses={200: OutpostHealthSerializer(many=True)})
181    @action(methods=["GET"], detail=True, pagination_class=None)
182    def health(self, request: Request, pk: int) -> Response:
183        """Get outposts current health"""
184        outpost: Outpost = self.get_object()
185        states = []
186        for state in outpost.state:
187            states.append(
188                {
189                    "uid": state.uid,
190                    "last_seen": state.last_seen,
191                    "version": state.version,
192                    "version_should": OUR_VERSION,
193                    "version_outdated": state.version_outdated,
194                    "build_hash": state.build_hash,
195                    "golang_version": state.golang_version,
196                    "openssl_enabled": state.openssl_enabled,
197                    "openssl_version": state.openssl_version,
198                    "fips_enabled": state.fips_enabled,
199                    "hostname": state.hostname,
200                    "build_hash_should": authentik_build_hash(),
201                }
202            )
203        return Response(OutpostHealthSerializer(states, many=True).data)
204
205    @extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
206    @action(detail=False, methods=["GET"])
207    def default_settings(self, request: Request) -> Response:
208        """Global default outpost config"""
209        host = self.request.build_absolute_uri("/")
210        return Response({"config": default_outpost_config(host)})
class OutpostSerializer(authentik.core.api.utils.ModelSerializer):
 39class OutpostSerializer(ModelSerializer):
 40    """Outpost Serializer"""
 41
 42    config = JSONDictField(source="_config")
 43    # Need to set allow_empty=True for the embedded outpost with no providers
 44    # is checked for other providers in the API Viewset
 45    providers = PrimaryKeyRelatedField(
 46        allow_empty=True,
 47        many=True,
 48        queryset=Provider.objects.select_subclasses().all(),
 49    )
 50    providers_obj = ProviderSerializer(source="providers", many=True, read_only=True)
 51    service_connection_obj = ServiceConnectionSerializer(
 52        source="service_connection",
 53        read_only=True,
 54        allow_null=True,
 55    )
 56    refresh_interval_s = SerializerMethodField()
 57
 58    def get_refresh_interval_s(self, obj: Outpost) -> int:
 59        return int(timedelta_from_string(obj.config.refresh_interval).total_seconds())
 60
 61    def validate_name(self, name: str) -> str:
 62        """Validate name (especially for embedded outpost)"""
 63        if not self.instance:
 64            return name
 65        if self.instance.managed == MANAGED_OUTPOST and name != MANAGED_OUTPOST_NAME:
 66            raise ValidationError("Embedded outpost's name cannot be changed")
 67        if self.instance.name == MANAGED_OUTPOST_NAME:
 68            self.instance.managed = MANAGED_OUTPOST
 69        return name
 70
 71    def validate_providers(self, providers: list[Provider]) -> list[Provider]:
 72        """Check that all providers match the type of the outpost"""
 73        type_map = {
 74            OutpostType.LDAP: LDAPProvider,
 75            OutpostType.PROXY: ProxyProvider,
 76            OutpostType.RADIUS: RadiusProvider,
 77            OutpostType.RAC: RACProvider,
 78            None: Provider,
 79        }
 80        for provider in providers:
 81            if not isinstance(provider, type_map[self.initial_data.get("type")]):
 82                raise ValidationError(
 83                    f"Outpost type {self.initial_data['type']} can't be used with "
 84                    f"{provider.__class__.__name__} providers."
 85                )
 86        if self.instance and self.instance.managed == MANAGED_OUTPOST:
 87            return providers
 88        if len(providers) < 1:
 89            raise ValidationError("This list may not be empty.")
 90        return providers
 91
 92    def validate_config(self, config) -> dict:
 93        """Check that the config has all required fields"""
 94        try:
 95            parsed = from_dict(OutpostConfig, config)
 96            timedelta_string_validator(parsed.refresh_interval)
 97        except DaciteError as exc:
 98            raise ValidationError(f"Failed to validate config: {str(exc)}") from exc
 99        return config
100
101    class Meta:
102        model = Outpost
103        fields = [
104            "pk",
105            "name",
106            "type",
107            "providers",
108            "providers_obj",
109            "service_connection",
110            "service_connection_obj",
111            "refresh_interval_s",
112            "token_identifier",
113            "config",
114            "managed",
115        ]
116        extra_kwargs = {"type": {"required": True}}

Outpost Serializer

config
providers
providers_obj
service_connection_obj
refresh_interval_s
def get_refresh_interval_s(self, obj: authentik.outposts.models.Outpost) -> int:
58    def get_refresh_interval_s(self, obj: Outpost) -> int:
59        return int(timedelta_from_string(obj.config.refresh_interval).total_seconds())
def validate_name(self, name: str) -> str:
61    def validate_name(self, name: str) -> str:
62        """Validate name (especially for embedded outpost)"""
63        if not self.instance:
64            return name
65        if self.instance.managed == MANAGED_OUTPOST and name != MANAGED_OUTPOST_NAME:
66            raise ValidationError("Embedded outpost's name cannot be changed")
67        if self.instance.name == MANAGED_OUTPOST_NAME:
68            self.instance.managed = MANAGED_OUTPOST
69        return name

Validate name (especially for embedded outpost)

def validate_providers( self, providers: list[authentik.core.models.Provider]) -> list[authentik.core.models.Provider]:
71    def validate_providers(self, providers: list[Provider]) -> list[Provider]:
72        """Check that all providers match the type of the outpost"""
73        type_map = {
74            OutpostType.LDAP: LDAPProvider,
75            OutpostType.PROXY: ProxyProvider,
76            OutpostType.RADIUS: RadiusProvider,
77            OutpostType.RAC: RACProvider,
78            None: Provider,
79        }
80        for provider in providers:
81            if not isinstance(provider, type_map[self.initial_data.get("type")]):
82                raise ValidationError(
83                    f"Outpost type {self.initial_data['type']} can't be used with "
84                    f"{provider.__class__.__name__} providers."
85                )
86        if self.instance and self.instance.managed == MANAGED_OUTPOST:
87            return providers
88        if len(providers) < 1:
89            raise ValidationError("This list may not be empty.")
90        return providers

Check that all providers match the type of the outpost

def validate_config(self, config) -> dict:
92    def validate_config(self, config) -> dict:
93        """Check that the config has all required fields"""
94        try:
95            parsed = from_dict(OutpostConfig, config)
96            timedelta_string_validator(parsed.refresh_interval)
97        except DaciteError as exc:
98            raise ValidationError(f"Failed to validate config: {str(exc)}") from exc
99        return config

Check that the config has all required fields

class OutpostSerializer.Meta:
101    class Meta:
102        model = Outpost
103        fields = [
104            "pk",
105            "name",
106            "type",
107            "providers",
108            "providers_obj",
109            "service_connection",
110            "service_connection_obj",
111            "refresh_interval_s",
112            "token_identifier",
113            "config",
114            "managed",
115        ]
116        extra_kwargs = {"type": {"required": True}}
fields = ['pk', 'name', 'type', 'providers', 'providers_obj', 'service_connection', 'service_connection_obj', 'refresh_interval_s', 'token_identifier', 'config', 'managed']
extra_kwargs = {'type': {'required': True}}
class OutpostDefaultConfigSerializer(authentik.core.api.utils.PassiveSerializer):
119class OutpostDefaultConfigSerializer(PassiveSerializer):
120    """Global default outpost config"""
121
122    config = JSONDictField(read_only=True)

Global default outpost config

config
class OutpostHealthSerializer(authentik.core.api.utils.PassiveSerializer):
125class OutpostHealthSerializer(PassiveSerializer):
126    """Outpost health status"""
127
128    uid = CharField(read_only=True)
129    last_seen = DateTimeField(read_only=True)
130    version = CharField(read_only=True)
131    golang_version = CharField(read_only=True)
132    openssl_enabled = BooleanField(read_only=True)
133    openssl_version = CharField(read_only=True)
134    fips_enabled = SerializerMethodField()
135
136    version_should = CharField(read_only=True)
137    version_outdated = BooleanField(read_only=True)
138
139    build_hash = CharField(read_only=True, required=False)
140    build_hash_should = CharField(read_only=True, required=False)
141
142    hostname = CharField(read_only=True, required=False)
143
144    def get_fips_enabled(self, obj: dict) -> bool | None:
145        """Get FIPS enabled"""
146        if not fips_enabled():
147            return None
148        return obj["fips_enabled"]

Outpost health status

uid
last_seen
version
golang_version
openssl_enabled
openssl_version
fips_enabled
version_should
version_outdated
build_hash
build_hash_should
hostname
def get_fips_enabled(self, obj: dict) -> bool | None:
144    def get_fips_enabled(self, obj: dict) -> bool | None:
145        """Get FIPS enabled"""
146        if not fips_enabled():
147            return None
148        return obj["fips_enabled"]

Get FIPS enabled

class OutpostFilter(django_filters.filterset.FilterSet):
151class OutpostFilter(FilterSet):
152    """Filter for Outposts"""
153
154    providers_by_pk = ModelMultipleChoiceFilter(
155        field_name="providers",
156        queryset=Provider.objects.all(),
157    )
158
159    class Meta:
160        model = Outpost
161        fields = {
162            "providers": ["isnull"],
163            "name": ["iexact", "icontains"],
164            "service_connection__name": ["iexact", "icontains"],
165            "managed": ["iexact", "icontains"],
166        }

Filter for Outposts

providers_by_pk
declared_filters = OrderedDict({'providers_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>})
base_filters = OrderedDict({'providers__isnull': <django_filters.filters.BooleanFilter object>, 'name__iexact': <django_filters.filters.CharFilter object>, 'name__icontains': <django_filters.filters.CharFilter object>, 'service_connection__name__iexact': <django_filters.filters.CharFilter object>, 'service_connection__name__icontains': <django_filters.filters.CharFilter object>, 'managed__iexact': <django_filters.filters.CharFilter object>, 'managed__icontains': <django_filters.filters.CharFilter object>, 'providers_by_pk': <django_filters.filters.ModelMultipleChoiceFilter object>})
class OutpostFilter.Meta:
159    class Meta:
160        model = Outpost
161        fields = {
162            "providers": ["isnull"],
163            "name": ["iexact", "icontains"],
164            "service_connection__name": ["iexact", "icontains"],
165            "managed": ["iexact", "icontains"],
166        }
fields = {'providers': ['isnull'], 'name': ['iexact', 'icontains'], 'service_connection__name': ['iexact', 'icontains'], 'managed': ['iexact', 'icontains']}
class OutpostViewSet(authentik.core.api.used_by.UsedByMixin, rest_framework.viewsets.ModelViewSet):
169class OutpostViewSet(UsedByMixin, ModelViewSet):
170    """Outpost Viewset"""
171
172    queryset = Outpost.objects.all()
173    serializer_class = OutpostSerializer
174    filterset_class = OutpostFilter
175    search_fields = [
176        "name",
177        "providers__name",
178    ]
179    ordering = ["name", "service_connection__name"]
180
181    @extend_schema(responses={200: OutpostHealthSerializer(many=True)})
182    @action(methods=["GET"], detail=True, pagination_class=None)
183    def health(self, request: Request, pk: int) -> Response:
184        """Get outposts current health"""
185        outpost: Outpost = self.get_object()
186        states = []
187        for state in outpost.state:
188            states.append(
189                {
190                    "uid": state.uid,
191                    "last_seen": state.last_seen,
192                    "version": state.version,
193                    "version_should": OUR_VERSION,
194                    "version_outdated": state.version_outdated,
195                    "build_hash": state.build_hash,
196                    "golang_version": state.golang_version,
197                    "openssl_enabled": state.openssl_enabled,
198                    "openssl_version": state.openssl_version,
199                    "fips_enabled": state.fips_enabled,
200                    "hostname": state.hostname,
201                    "build_hash_should": authentik_build_hash(),
202                }
203            )
204        return Response(OutpostHealthSerializer(states, many=True).data)
205
206    @extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
207    @action(detail=False, methods=["GET"])
208    def default_settings(self, request: Request) -> Response:
209        """Global default outpost config"""
210        host = self.request.build_absolute_uri("/")
211        return Response({"config": default_outpost_config(host)})

Outpost Viewset

queryset = <QuerySet []>
serializer_class = <class 'OutpostSerializer'>
filterset_class = <class 'OutpostFilter'>
search_fields = ['name', 'providers__name']
ordering = ['name', 'service_connection__name']
@extend_schema(responses={200: OutpostHealthSerializer(many=True)})
@action(methods=['GET'], detail=True, pagination_class=None)
def health( self, request: rest_framework.request.Request, pk: int) -> rest_framework.response.Response:
181    @extend_schema(responses={200: OutpostHealthSerializer(many=True)})
182    @action(methods=["GET"], detail=True, pagination_class=None)
183    def health(self, request: Request, pk: int) -> Response:
184        """Get outposts current health"""
185        outpost: Outpost = self.get_object()
186        states = []
187        for state in outpost.state:
188            states.append(
189                {
190                    "uid": state.uid,
191                    "last_seen": state.last_seen,
192                    "version": state.version,
193                    "version_should": OUR_VERSION,
194                    "version_outdated": state.version_outdated,
195                    "build_hash": state.build_hash,
196                    "golang_version": state.golang_version,
197                    "openssl_enabled": state.openssl_enabled,
198                    "openssl_version": state.openssl_version,
199                    "fips_enabled": state.fips_enabled,
200                    "hostname": state.hostname,
201                    "build_hash_should": authentik_build_hash(),
202                }
203            )
204        return Response(OutpostHealthSerializer(states, many=True).data)

Get outposts current health

@extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
@action(detail=False, methods=['GET'])
def default_settings( self, request: rest_framework.request.Request) -> rest_framework.response.Response:
206    @extend_schema(responses={200: OutpostDefaultConfigSerializer(many=False)})
207    @action(detail=False, methods=["GET"])
208    def default_settings(self, request: Request) -> Response:
209        """Global default outpost config"""
210        host = self.request.build_absolute_uri("/")
211        return Response({"config": default_outpost_config(host)})

Global default outpost config

name = None
description = None
suffix = None
detail = None
basename = None