authentik.outposts.controllers.base
Base Controller
1"""Base Controller""" 2 3from dataclasses import dataclass 4 5from structlog.stdlib import get_logger 6 7from authentik import authentik_build_hash, authentik_version 8from authentik.events.logs import LogEvent, capture_logs 9from authentik.lib.config import CONFIG 10from authentik.lib.sentry import SentryIgnoredException 11from authentik.outposts.models import ( 12 Outpost, 13 OutpostServiceConnection, 14 OutpostServiceConnectionState, 15) 16 17FIELD_MANAGER = "goauthentik.io" 18 19 20class ControllerException(SentryIgnoredException): 21 """Exception raised when anything fails during controller run""" 22 23 24@dataclass 25class DeploymentPort: 26 """Info about deployment's single port.""" 27 28 port: int 29 name: str 30 protocol: str 31 inner_port: int | None = None 32 33 34class BaseClient: 35 """Base class for custom clients""" 36 37 def fetch_state(self) -> OutpostServiceConnectionState: 38 """Get state, version info""" 39 raise NotImplementedError 40 41 def __enter__(self): 42 return self 43 44 def __exit__(self, exc_type, exc_value, traceback): 45 """Cleanup after usage""" 46 47 48class BaseController: 49 """Base Outpost deployment controller""" 50 51 deployment_ports: list[DeploymentPort] 52 client: BaseClient 53 outpost: Outpost 54 connection: OutpostServiceConnection 55 56 def __init__(self, outpost: Outpost, connection: OutpostServiceConnection): 57 self.outpost = outpost 58 self.connection = connection 59 self.logger = get_logger() 60 self.deployment_ports = [] 61 self.metrics_ports = [ 62 DeploymentPort(9300, "http-metrics", "tcp"), 63 ] 64 65 def up(self): 66 """Called by scheduled task to reconcile deployment/service/etc""" 67 raise NotImplementedError 68 69 def up_with_logs(self) -> list[LogEvent]: 70 """Call .up() but capture all log output and return it.""" 71 with capture_logs() as logs: 72 self.up() 73 return logs 74 75 def down(self): 76 """Handler to delete everything we've created""" 77 raise NotImplementedError 78 79 def down_with_logs(self) -> list[LogEvent]: 80 """Call .down() but capture all log output and return it.""" 81 with capture_logs() as logs: 82 self.down() 83 return logs 84 85 def __enter__(self): 86 return self 87 88 def __exit__(self, exc_type, exc_value, traceback): 89 """Cleanup after usage""" 90 if hasattr(self, "client"): 91 self.client.__exit__(exc_type, exc_value, traceback) 92 93 def get_static_deployment(self) -> str: 94 """Return a static deployment configuration""" 95 raise NotImplementedError 96 97 def get_container_image(self) -> str: 98 """Get container image to use for this outpost""" 99 if self.outpost.config.container_image is not None: 100 return self.outpost.config.container_image 101 102 image_name_template: str = CONFIG.get("outposts.container_image_base") 103 return image_name_template % { 104 "type": self.outpost.type, 105 "version": authentik_version(), 106 "build_hash": authentik_build_hash(), 107 }
FIELD_MANAGER =
'goauthentik.io'
21class ControllerException(SentryIgnoredException): 22 """Exception raised when anything fails during controller run"""
Exception raised when anything fails during controller run
@dataclass
class
DeploymentPort:
25@dataclass 26class DeploymentPort: 27 """Info about deployment's single port.""" 28 29 port: int 30 name: str 31 protocol: str 32 inner_port: int | None = None
Info about deployment's single port.
class
BaseClient:
35class BaseClient: 36 """Base class for custom clients""" 37 38 def fetch_state(self) -> OutpostServiceConnectionState: 39 """Get state, version info""" 40 raise NotImplementedError 41 42 def __enter__(self): 43 return self 44 45 def __exit__(self, exc_type, exc_value, traceback): 46 """Cleanup after usage"""
Base class for custom clients
class
BaseController:
49class BaseController: 50 """Base Outpost deployment controller""" 51 52 deployment_ports: list[DeploymentPort] 53 client: BaseClient 54 outpost: Outpost 55 connection: OutpostServiceConnection 56 57 def __init__(self, outpost: Outpost, connection: OutpostServiceConnection): 58 self.outpost = outpost 59 self.connection = connection 60 self.logger = get_logger() 61 self.deployment_ports = [] 62 self.metrics_ports = [ 63 DeploymentPort(9300, "http-metrics", "tcp"), 64 ] 65 66 def up(self): 67 """Called by scheduled task to reconcile deployment/service/etc""" 68 raise NotImplementedError 69 70 def up_with_logs(self) -> list[LogEvent]: 71 """Call .up() but capture all log output and return it.""" 72 with capture_logs() as logs: 73 self.up() 74 return logs 75 76 def down(self): 77 """Handler to delete everything we've created""" 78 raise NotImplementedError 79 80 def down_with_logs(self) -> list[LogEvent]: 81 """Call .down() but capture all log output and return it.""" 82 with capture_logs() as logs: 83 self.down() 84 return logs 85 86 def __enter__(self): 87 return self 88 89 def __exit__(self, exc_type, exc_value, traceback): 90 """Cleanup after usage""" 91 if hasattr(self, "client"): 92 self.client.__exit__(exc_type, exc_value, traceback) 93 94 def get_static_deployment(self) -> str: 95 """Return a static deployment configuration""" 96 raise NotImplementedError 97 98 def get_container_image(self) -> str: 99 """Get container image to use for this outpost""" 100 if self.outpost.config.container_image is not None: 101 return self.outpost.config.container_image 102 103 image_name_template: str = CONFIG.get("outposts.container_image_base") 104 return image_name_template % { 105 "type": self.outpost.type, 106 "version": authentik_version(), 107 "build_hash": authentik_build_hash(), 108 }
Base Outpost deployment controller
BaseController( outpost: authentik.outposts.models.Outpost, connection: authentik.outposts.models.OutpostServiceConnection)
deployment_ports: list[DeploymentPort]
client: BaseClient
outpost: authentik.outposts.models.Outpost
def
up(self):
66 def up(self): 67 """Called by scheduled task to reconcile deployment/service/etc""" 68 raise NotImplementedError
Called by scheduled task to reconcile deployment/service/etc
70 def up_with_logs(self) -> list[LogEvent]: 71 """Call .up() but capture all log output and return it.""" 72 with capture_logs() as logs: 73 self.up() 74 return logs
Call .up() but capture all log output and return it.
80 def down_with_logs(self) -> list[LogEvent]: 81 """Call .down() but capture all log output and return it.""" 82 with capture_logs() as logs: 83 self.down() 84 return logs
Call .down() but capture all log output and return it.
def
get_static_deployment(self) -> str:
94 def get_static_deployment(self) -> str: 95 """Return a static deployment configuration""" 96 raise NotImplementedError
Return a static deployment configuration
def
get_container_image(self) -> str:
98 def get_container_image(self) -> str: 99 """Get container image to use for this outpost""" 100 if self.outpost.config.container_image is not None: 101 return self.outpost.config.container_image 102 103 image_name_template: str = CONFIG.get("outposts.container_image_base") 104 return image_name_template % { 105 "type": self.outpost.type, 106 "version": authentik_version(), 107 "build_hash": authentik_build_hash(), 108 }
Get container image to use for this outpost