authentik.outposts.tasks

outpost tasks

  1"""outpost tasks"""
  2
  3from hashlib import sha256
  4from os import R_OK, access
  5from pathlib import Path
  6from socket import gethostname
  7from typing import Any
  8from urllib.parse import urlparse
  9
 10from channels.layers import get_channel_layer
 11from django.core.cache import cache
 12from django.utils.translation import gettext_lazy as _
 13from docker.constants import DEFAULT_UNIX_SOCKET
 14from dramatiq.actor import actor
 15from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME
 16from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION
 17from structlog.stdlib import get_logger
 18from yaml import safe_load
 19
 20from authentik.lib.config import CONFIG
 21from authentik.outposts.consumer import build_outpost_group
 22from authentik.outposts.controllers.base import BaseController, ControllerException
 23from authentik.outposts.controllers.docker import DockerClient
 24from authentik.outposts.controllers.kubernetes import KubernetesClient
 25from authentik.outposts.models import (
 26    DockerServiceConnection,
 27    KubernetesServiceConnection,
 28    Outpost,
 29    OutpostServiceConnection,
 30    OutpostType,
 31    ServiceConnectionInvalid,
 32)
 33from authentik.providers.ldap.controllers.docker import LDAPDockerController
 34from authentik.providers.ldap.controllers.kubernetes import LDAPKubernetesController
 35from authentik.providers.proxy.controllers.docker import ProxyDockerController
 36from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController
 37from authentik.providers.rac.controllers.docker import RACDockerController
 38from authentik.providers.rac.controllers.kubernetes import RACKubernetesController
 39from authentik.providers.radius.controllers.docker import RadiusDockerController
 40from authentik.providers.radius.controllers.kubernetes import RadiusKubernetesController
 41from authentik.tasks.middleware import CurrentTask
 42
 43LOGGER = get_logger()
 44CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s"
 45
 46
 47def hash_session_key(session_key: str) -> str:
 48    """Hash the session key for sending session end signals"""
 49    return sha256(session_key.encode("ascii")).hexdigest()
 50
 51
 52def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None:
 53    """Get a controller for the outpost, when a service connection is defined"""
 54    if not outpost.service_connection:
 55        return None
 56    service_connection = outpost.service_connection
 57    if outpost.type == OutpostType.PROXY:
 58        if isinstance(service_connection, DockerServiceConnection):
 59            return ProxyDockerController
 60        if isinstance(service_connection, KubernetesServiceConnection):
 61            return ProxyKubernetesController
 62    if outpost.type == OutpostType.LDAP:
 63        if isinstance(service_connection, DockerServiceConnection):
 64            return LDAPDockerController
 65        if isinstance(service_connection, KubernetesServiceConnection):
 66            return LDAPKubernetesController
 67    if outpost.type == OutpostType.RADIUS:
 68        if isinstance(service_connection, DockerServiceConnection):
 69            return RadiusDockerController
 70        if isinstance(service_connection, KubernetesServiceConnection):
 71            return RadiusKubernetesController
 72    if outpost.type == OutpostType.RAC:
 73        if isinstance(service_connection, DockerServiceConnection):
 74            return RACDockerController
 75        if isinstance(service_connection, KubernetesServiceConnection):
 76            return RACKubernetesController
 77    return None
 78
 79
 80@actor(description=_("Update cached state of service connection."))
 81def outpost_service_connection_monitor(connection_pk: Any):
 82    """Update cached state of a service connection"""
 83    connection: OutpostServiceConnection = (
 84        OutpostServiceConnection.objects.filter(pk=connection_pk).select_subclasses().first()
 85    )
 86    if not connection:
 87        return
 88    cls = None
 89    if isinstance(connection, DockerServiceConnection):
 90        cls = DockerClient
 91    if isinstance(connection, KubernetesServiceConnection):
 92        cls = KubernetesClient
 93    if not cls:
 94        LOGGER.warning("No class found for service connection", connection=connection)
 95        return
 96    try:
 97        with cls(connection) as client:
 98            state = client.fetch_state()
 99    except ServiceConnectionInvalid as exc:
100        LOGGER.warning("Failed to get client status", exc=exc)
101        return
102    cache.set(connection.state_key, state, timeout=None)
103
104
105@actor(description=_("Create/update/monitor/delete the deployment of an Outpost."))
106def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = False):
107    """Create/update/monitor/delete the deployment of an Outpost"""
108    self = CurrentTask.get_task()
109    logs = []
110    if from_cache:
111        outpost: Outpost = cache.get(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
112        LOGGER.debug("Getting outpost from cache to delete")
113    else:
114        outpost: Outpost = Outpost.objects.filter(pk=outpost_pk).first()
115        LOGGER.debug("Getting outpost from DB")
116    if not outpost:
117        LOGGER.warning("No outpost")
118        return
119    try:
120        controller_type = controller_for_outpost(outpost)
121        if not controller_type:
122            return
123        with controller_type(outpost, outpost.service_connection) as controller:
124            LOGGER.debug("---------------Outpost Controller logs starting----------------")
125            logs = getattr(controller, f"{action}_with_logs")()
126            LOGGER.debug("-----------------Outpost Controller logs end-------------------")
127    except (ControllerException, ServiceConnectionInvalid) as exc:
128        self.error(exc)
129    else:
130        if from_cache:
131            cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk)
132        self.logs(logs)
133
134
135@actor(description=_("Ensure that all Outposts have valid Service Accounts and Tokens."))
136def outpost_token_ensurer():
137    """
138    Periodically ensure that all Outposts have valid Service Accounts and Tokens
139    """
140    self = CurrentTask.get_task()
141    all_outposts = Outpost.objects.all()
142    for outpost in all_outposts:
143        _ = outpost.token
144        outpost.build_user_permissions(outpost.user)
145    self.info(f"Successfully checked {len(all_outposts)} Outposts.")
146
147
148@actor(description=_("Send update to outpost"))
149def outpost_send_update(pk: Any):
150    """Update outpost instance"""
151    outpost = Outpost.objects.filter(pk=pk).first()
152    if not outpost:
153        return
154    # Ensure token again, because this function is called when anything related to an
155    # OutpostModel is saved, so we can be sure permissions are right
156    _ = outpost.token
157    outpost.build_user_permissions(outpost.user)
158    layer = get_channel_layer()
159    group = build_outpost_group(outpost.pk)
160    LOGGER.debug("sending update", channel=group, outpost=outpost)
161    layer.group_send_blocking(group, {"type": "event.update"})
162
163
164@actor(description=_("Checks the local environment and create Service connections."))
165def outpost_connection_discovery():
166    """Checks the local environment and create Service connections."""
167    self = CurrentTask.get_task()
168    if not CONFIG.get_bool("outposts.discover"):
169        self.info("Outpost integration discovery is disabled")
170        return
171    # Explicitly check against token filename, as that's
172    # only present when the integration is enabled
173    if Path(SERVICE_TOKEN_FILENAME).exists():
174        self.info("Detected in-cluster Kubernetes Config")
175        if not KubernetesServiceConnection.objects.filter(local=True).exists():
176            self.info("Created Service Connection for in-cluster")
177            KubernetesServiceConnection.objects.create(
178                name="Local Kubernetes Cluster", local=True, kubeconfig={}
179            )
180    # For development, check for the existence of a kubeconfig file
181    kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser()
182    if kubeconfig_path.exists():
183        self.info("Detected kubeconfig")
184        kubeconfig_local_name = f"k8s-{gethostname()}"
185        if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists():
186            self.info("Creating kubeconfig Service Connection")
187            with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig:
188                KubernetesServiceConnection.objects.create(
189                    name=kubeconfig_local_name,
190                    kubeconfig=safe_load(_kubeconfig),
191                )
192    unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path
193    socket = Path(unix_socket_path)
194    if socket.exists() and access(socket, R_OK):
195        self.info("Detected local docker socket")
196        if len(DockerServiceConnection.objects.filter(local=True)) == 0:
197            self.info("Created Service Connection for docker")
198            DockerServiceConnection.objects.create(
199                name="Local Docker connection",
200                local=True,
201                url=unix_socket_path,
202            )
203
204
205@actor(description=_("Terminate session on all outposts."))
206def outpost_session_end(session_id: str):
207    layer = get_channel_layer()
208    hashed_session_id = hash_session_key(session_id)
209    for outpost in Outpost.objects.all():
210        LOGGER.info("Sending session end signal to outpost", outpost=outpost)
211        group = build_outpost_group(outpost.pk)
212        layer.group_send_blocking(
213            group,
214            {
215                "type": "event.session.end",
216                "session_id": hashed_session_id,
217            },
218        )
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_KEY_OUTPOST_DOWN = 'goauthentik.io/outposts/teardown/%s'
def hash_session_key(session_key: str) -> str:
48def hash_session_key(session_key: str) -> str:
49    """Hash the session key for sending session end signals"""
50    return sha256(session_key.encode("ascii")).hexdigest()

Hash the session key for sending session end signals

def controller_for_outpost( outpost: authentik.outposts.models.Outpost) -> type[authentik.outposts.controllers.base.BaseController] | None:
53def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None:
54    """Get a controller for the outpost, when a service connection is defined"""
55    if not outpost.service_connection:
56        return None
57    service_connection = outpost.service_connection
58    if outpost.type == OutpostType.PROXY:
59        if isinstance(service_connection, DockerServiceConnection):
60            return ProxyDockerController
61        if isinstance(service_connection, KubernetesServiceConnection):
62            return ProxyKubernetesController
63    if outpost.type == OutpostType.LDAP:
64        if isinstance(service_connection, DockerServiceConnection):
65            return LDAPDockerController
66        if isinstance(service_connection, KubernetesServiceConnection):
67            return LDAPKubernetesController
68    if outpost.type == OutpostType.RADIUS:
69        if isinstance(service_connection, DockerServiceConnection):
70            return RadiusDockerController
71        if isinstance(service_connection, KubernetesServiceConnection):
72            return RadiusKubernetesController
73    if outpost.type == OutpostType.RAC:
74        if isinstance(service_connection, DockerServiceConnection):
75            return RACDockerController
76        if isinstance(service_connection, KubernetesServiceConnection):
77            return RACKubernetesController
78    return None

Get a controller for the outpost, when a service connection is defined

outpost_service_connection_monitor = Actor(<function outpost_service_connection_monitor>, queue_name='default', actor_name='outpost_service_connection_monitor')

Update cached state of a service connection

outpost_controller = Actor(<function outpost_controller>, queue_name='default', actor_name='outpost_controller')

Create/update/monitor/delete the deployment of an Outpost

outpost_token_ensurer = Actor(<function outpost_token_ensurer>, queue_name='default', actor_name='outpost_token_ensurer')

Periodically ensure that all Outposts have valid Service Accounts and Tokens

outpost_send_update = Actor(<function outpost_send_update>, queue_name='default', actor_name='outpost_send_update')

Update outpost instance

outpost_connection_discovery = Actor(<function outpost_connection_discovery>, queue_name='default', actor_name='outpost_connection_discovery')

Checks the local environment and create Service connections.