authentik.outposts.tasks
outpost tasks
1"""outpost tasks""" 2 3from hashlib import sha256 4from os import R_OK, access 5from pathlib import Path 6from socket import gethostname 7from typing import Any 8from urllib.parse import urlparse 9 10from channels.layers import get_channel_layer 11from django.core.cache import cache 12from django.utils.translation import gettext_lazy as _ 13from docker.constants import DEFAULT_UNIX_SOCKET 14from dramatiq.actor import actor 15from kubernetes.config.incluster_config import SERVICE_TOKEN_FILENAME 16from kubernetes.config.kube_config import KUBE_CONFIG_DEFAULT_LOCATION 17from structlog.stdlib import get_logger 18from yaml import safe_load 19 20from authentik.lib.config import CONFIG 21from authentik.outposts.consumer import build_outpost_group 22from authentik.outposts.controllers.base import BaseController, ControllerException 23from authentik.outposts.controllers.docker import DockerClient 24from authentik.outposts.controllers.kubernetes import KubernetesClient 25from authentik.outposts.models import ( 26 DockerServiceConnection, 27 KubernetesServiceConnection, 28 Outpost, 29 OutpostServiceConnection, 30 OutpostType, 31 ServiceConnectionInvalid, 32) 33from authentik.providers.ldap.controllers.docker import LDAPDockerController 34from authentik.providers.ldap.controllers.kubernetes import LDAPKubernetesController 35from authentik.providers.proxy.controllers.docker import ProxyDockerController 36from authentik.providers.proxy.controllers.kubernetes import ProxyKubernetesController 37from authentik.providers.rac.controllers.docker import RACDockerController 38from authentik.providers.rac.controllers.kubernetes import RACKubernetesController 39from authentik.providers.radius.controllers.docker import RadiusDockerController 40from authentik.providers.radius.controllers.kubernetes import RadiusKubernetesController 41from authentik.tasks.middleware import CurrentTask 42 43LOGGER = get_logger() 44CACHE_KEY_OUTPOST_DOWN = "goauthentik.io/outposts/teardown/%s" 45 46 47def hash_session_key(session_key: str) -> str: 48 """Hash the session key for sending session end signals""" 49 return sha256(session_key.encode("ascii")).hexdigest() 50 51 52def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None: 53 """Get a controller for the outpost, when a service connection is defined""" 54 if not outpost.service_connection: 55 return None 56 service_connection = outpost.service_connection 57 if outpost.type == OutpostType.PROXY: 58 if isinstance(service_connection, DockerServiceConnection): 59 return ProxyDockerController 60 if isinstance(service_connection, KubernetesServiceConnection): 61 return ProxyKubernetesController 62 if outpost.type == OutpostType.LDAP: 63 if isinstance(service_connection, DockerServiceConnection): 64 return LDAPDockerController 65 if isinstance(service_connection, KubernetesServiceConnection): 66 return LDAPKubernetesController 67 if outpost.type == OutpostType.RADIUS: 68 if isinstance(service_connection, DockerServiceConnection): 69 return RadiusDockerController 70 if isinstance(service_connection, KubernetesServiceConnection): 71 return RadiusKubernetesController 72 if outpost.type == OutpostType.RAC: 73 if isinstance(service_connection, DockerServiceConnection): 74 return RACDockerController 75 if isinstance(service_connection, KubernetesServiceConnection): 76 return RACKubernetesController 77 return None 78 79 80@actor(description=_("Update cached state of service connection.")) 81def outpost_service_connection_monitor(connection_pk: Any): 82 """Update cached state of a service connection""" 83 connection: OutpostServiceConnection = ( 84 OutpostServiceConnection.objects.filter(pk=connection_pk).select_subclasses().first() 85 ) 86 if not connection: 87 return 88 cls = None 89 if isinstance(connection, DockerServiceConnection): 90 cls = DockerClient 91 if isinstance(connection, KubernetesServiceConnection): 92 cls = KubernetesClient 93 if not cls: 94 LOGGER.warning("No class found for service connection", connection=connection) 95 return 96 try: 97 with cls(connection) as client: 98 state = client.fetch_state() 99 except ServiceConnectionInvalid as exc: 100 LOGGER.warning("Failed to get client status", exc=exc) 101 return 102 cache.set(connection.state_key, state, timeout=None) 103 104 105@actor(description=_("Create/update/monitor/delete the deployment of an Outpost.")) 106def outpost_controller(outpost_pk: str, action: str = "up", from_cache: bool = False): 107 """Create/update/monitor/delete the deployment of an Outpost""" 108 self = CurrentTask.get_task() 109 logs = [] 110 if from_cache: 111 outpost: Outpost = cache.get(CACHE_KEY_OUTPOST_DOWN % outpost_pk) 112 LOGGER.debug("Getting outpost from cache to delete") 113 else: 114 outpost: Outpost = Outpost.objects.filter(pk=outpost_pk).first() 115 LOGGER.debug("Getting outpost from DB") 116 if not outpost: 117 LOGGER.warning("No outpost") 118 return 119 try: 120 controller_type = controller_for_outpost(outpost) 121 if not controller_type: 122 return 123 with controller_type(outpost, outpost.service_connection) as controller: 124 LOGGER.debug("---------------Outpost Controller logs starting----------------") 125 logs = getattr(controller, f"{action}_with_logs")() 126 LOGGER.debug("-----------------Outpost Controller logs end-------------------") 127 except (ControllerException, ServiceConnectionInvalid) as exc: 128 self.error(exc) 129 else: 130 if from_cache: 131 cache.delete(CACHE_KEY_OUTPOST_DOWN % outpost_pk) 132 self.logs(logs) 133 134 135@actor(description=_("Ensure that all Outposts have valid Service Accounts and Tokens.")) 136def outpost_token_ensurer(): 137 """ 138 Periodically ensure that all Outposts have valid Service Accounts and Tokens 139 """ 140 self = CurrentTask.get_task() 141 all_outposts = Outpost.objects.all() 142 for outpost in all_outposts: 143 _ = outpost.token 144 outpost.build_user_permissions(outpost.user) 145 self.info(f"Successfully checked {len(all_outposts)} Outposts.") 146 147 148@actor(description=_("Send update to outpost")) 149def outpost_send_update(pk: Any): 150 """Update outpost instance""" 151 outpost = Outpost.objects.filter(pk=pk).first() 152 if not outpost: 153 return 154 # Ensure token again, because this function is called when anything related to an 155 # OutpostModel is saved, so we can be sure permissions are right 156 _ = outpost.token 157 outpost.build_user_permissions(outpost.user) 158 layer = get_channel_layer() 159 group = build_outpost_group(outpost.pk) 160 LOGGER.debug("sending update", channel=group, outpost=outpost) 161 layer.group_send_blocking(group, {"type": "event.update"}) 162 163 164@actor(description=_("Checks the local environment and create Service connections.")) 165def outpost_connection_discovery(): 166 """Checks the local environment and create Service connections.""" 167 self = CurrentTask.get_task() 168 if not CONFIG.get_bool("outposts.discover"): 169 self.info("Outpost integration discovery is disabled") 170 return 171 # Explicitly check against token filename, as that's 172 # only present when the integration is enabled 173 if Path(SERVICE_TOKEN_FILENAME).exists(): 174 self.info("Detected in-cluster Kubernetes Config") 175 if not KubernetesServiceConnection.objects.filter(local=True).exists(): 176 self.info("Created Service Connection for in-cluster") 177 KubernetesServiceConnection.objects.create( 178 name="Local Kubernetes Cluster", local=True, kubeconfig={} 179 ) 180 # For development, check for the existence of a kubeconfig file 181 kubeconfig_path = Path(KUBE_CONFIG_DEFAULT_LOCATION).expanduser() 182 if kubeconfig_path.exists(): 183 self.info("Detected kubeconfig") 184 kubeconfig_local_name = f"k8s-{gethostname()}" 185 if not KubernetesServiceConnection.objects.filter(name=kubeconfig_local_name).exists(): 186 self.info("Creating kubeconfig Service Connection") 187 with kubeconfig_path.open("r", encoding="utf8") as _kubeconfig: 188 KubernetesServiceConnection.objects.create( 189 name=kubeconfig_local_name, 190 kubeconfig=safe_load(_kubeconfig), 191 ) 192 unix_socket_path = urlparse(DEFAULT_UNIX_SOCKET).path 193 socket = Path(unix_socket_path) 194 if socket.exists() and access(socket, R_OK): 195 self.info("Detected local docker socket") 196 if len(DockerServiceConnection.objects.filter(local=True)) == 0: 197 self.info("Created Service Connection for docker") 198 DockerServiceConnection.objects.create( 199 name="Local Docker connection", 200 local=True, 201 url=unix_socket_path, 202 ) 203 204 205@actor(description=_("Terminate session on all outposts.")) 206def outpost_session_end(session_id: str): 207 layer = get_channel_layer() 208 hashed_session_id = hash_session_key(session_id) 209 for outpost in Outpost.objects.all(): 210 LOGGER.info("Sending session end signal to outpost", outpost=outpost) 211 group = build_outpost_group(outpost.pk) 212 layer.group_send_blocking( 213 group, 214 { 215 "type": "event.session.end", 216 "session_id": hashed_session_id, 217 }, 218 )
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_KEY_OUTPOST_DOWN =
'goauthentik.io/outposts/teardown/%s'
def
hash_session_key(session_key: str) -> str:
48def hash_session_key(session_key: str) -> str: 49 """Hash the session key for sending session end signals""" 50 return sha256(session_key.encode("ascii")).hexdigest()
Hash the session key for sending session end signals
def
controller_for_outpost( outpost: authentik.outposts.models.Outpost) -> type[authentik.outposts.controllers.base.BaseController] | None:
53def controller_for_outpost(outpost: Outpost) -> type[BaseController] | None: 54 """Get a controller for the outpost, when a service connection is defined""" 55 if not outpost.service_connection: 56 return None 57 service_connection = outpost.service_connection 58 if outpost.type == OutpostType.PROXY: 59 if isinstance(service_connection, DockerServiceConnection): 60 return ProxyDockerController 61 if isinstance(service_connection, KubernetesServiceConnection): 62 return ProxyKubernetesController 63 if outpost.type == OutpostType.LDAP: 64 if isinstance(service_connection, DockerServiceConnection): 65 return LDAPDockerController 66 if isinstance(service_connection, KubernetesServiceConnection): 67 return LDAPKubernetesController 68 if outpost.type == OutpostType.RADIUS: 69 if isinstance(service_connection, DockerServiceConnection): 70 return RadiusDockerController 71 if isinstance(service_connection, KubernetesServiceConnection): 72 return RadiusKubernetesController 73 if outpost.type == OutpostType.RAC: 74 if isinstance(service_connection, DockerServiceConnection): 75 return RACDockerController 76 if isinstance(service_connection, KubernetesServiceConnection): 77 return RACKubernetesController 78 return None
Get a controller for the outpost, when a service connection is defined
outpost_service_connection_monitor =
Actor(<function outpost_service_connection_monitor>, queue_name='default', actor_name='outpost_service_connection_monitor')
Update cached state of a service connection
outpost_controller =
Actor(<function outpost_controller>, queue_name='default', actor_name='outpost_controller')
Create/update/monitor/delete the deployment of an Outpost
outpost_token_ensurer =
Actor(<function outpost_token_ensurer>, queue_name='default', actor_name='outpost_token_ensurer')
Periodically ensure that all Outposts have valid Service Accounts and Tokens
outpost_send_update =
Actor(<function outpost_send_update>, queue_name='default', actor_name='outpost_send_update')
Update outpost instance
outpost_connection_discovery =
Actor(<function outpost_connection_discovery>, queue_name='default', actor_name='outpost_connection_discovery')
Checks the local environment and create Service connections.