authentik.outposts.docker_ssh
Docker SSH helper
1"""Docker SSH helper""" 2 3import os 4from pathlib import Path 5from tempfile import gettempdir 6 7from docker.errors import DockerException 8 9from authentik.crypto.models import CertificateKeyPair 10 11HEADER = "### Managed by authentik" 12FOOTER = "### End Managed by authentik" 13 14 15def opener(path: Path | str, flags: int): 16 """File opener to create files as 600 perms""" 17 return os.open(path, flags, 0o600) 18 19 20class SSHManagedExternallyException(DockerException): 21 """Raised when the ssh config file is managed externally.""" 22 23 24class DockerInlineSSH: 25 """Create paramiko ssh config from CertificateKeyPair""" 26 27 host: str 28 keypair: CertificateKeyPair 29 30 key_path: str 31 config_path: Path 32 header: str 33 34 def __init__(self, host: str, keypair: CertificateKeyPair) -> None: 35 self.host = host 36 self.keypair = keypair 37 self.config_path = Path("~/.ssh/config").expanduser() 38 if self.config_path.exists() and HEADER not in self.config_path.read_text(encoding="utf-8"): 39 # SSH Config file already exists and there's no header from us, meaning that it's 40 # been externally mapped into the container for more complex configs 41 raise SSHManagedExternallyException( 42 "SSH Config exists and does not contain authentik header" 43 ) 44 if not self.keypair: 45 raise DockerException("keypair must be set for SSH connections") 46 self.header = f"{HEADER} - {self.host}\n" 47 48 def write_config(self, key_path: str) -> bool: 49 """Update the local user's ssh config file""" 50 with open(self.config_path, "a+", encoding="utf-8") as ssh_config: 51 if self.header in ssh_config.readlines(): 52 return False 53 ssh_config.writelines( 54 [ 55 self.header, 56 f"Host {self.host}\n", 57 f" IdentityFile {key_path}\n", 58 " StrictHostKeyChecking No\n", 59 " UserKnownHostsFile /dev/null\n", 60 f"{FOOTER}\n", 61 "\n", 62 ] 63 ) 64 return True 65 66 def write_key(self): 67 """Write keypair's private key to a temporary file""" 68 path = Path(gettempdir(), f"{self.keypair.pk}_private.pem") 69 with open(path, "w", encoding="utf8", opener=opener) as _file: 70 _file.write(self.keypair.key_data) 71 return str(path) 72 73 def write(self): 74 """Write keyfile and update ssh config""" 75 self.key_path = self.write_key() 76 was_written = self.write_config(self.key_path) 77 if not was_written: 78 self.cleanup() 79 80 def cleanup(self): 81 """Cleanup when we're done""" 82 try: 83 os.unlink(self.key_path) 84 with open(self.config_path, encoding="utf-8") as ssh_config: 85 start = 0 86 end = 0 87 lines = ssh_config.readlines() 88 for idx, line in enumerate(lines): 89 if line == self.header: 90 start = idx 91 if start != 0 and line == f"{FOOTER}\n": 92 end = idx 93 with open(self.config_path, "w+", encoding="utf-8") as ssh_config: 94 lines = lines[:start] + lines[end + 2 :] 95 ssh_config.writelines(lines) 96 except OSError: 97 # If we fail deleting a file it doesn't matter that much 98 # since we're just in a container 99 pass
HEADER =
'### Managed by authentik'
FOOTER =
'### End Managed by authentik'
def
opener(path: pathlib.Path | str, flags: int):
16def opener(path: Path | str, flags: int): 17 """File opener to create files as 600 perms""" 18 return os.open(path, flags, 0o600)
File opener to create files as 600 perms
class
SSHManagedExternallyException(docker.errors.DockerException):
21class SSHManagedExternallyException(DockerException): 22 """Raised when the ssh config file is managed externally."""
Raised when the ssh config file is managed externally.
class
DockerInlineSSH:
25class DockerInlineSSH: 26 """Create paramiko ssh config from CertificateKeyPair""" 27 28 host: str 29 keypair: CertificateKeyPair 30 31 key_path: str 32 config_path: Path 33 header: str 34 35 def __init__(self, host: str, keypair: CertificateKeyPair) -> None: 36 self.host = host 37 self.keypair = keypair 38 self.config_path = Path("~/.ssh/config").expanduser() 39 if self.config_path.exists() and HEADER not in self.config_path.read_text(encoding="utf-8"): 40 # SSH Config file already exists and there's no header from us, meaning that it's 41 # been externally mapped into the container for more complex configs 42 raise SSHManagedExternallyException( 43 "SSH Config exists and does not contain authentik header" 44 ) 45 if not self.keypair: 46 raise DockerException("keypair must be set for SSH connections") 47 self.header = f"{HEADER} - {self.host}\n" 48 49 def write_config(self, key_path: str) -> bool: 50 """Update the local user's ssh config file""" 51 with open(self.config_path, "a+", encoding="utf-8") as ssh_config: 52 if self.header in ssh_config.readlines(): 53 return False 54 ssh_config.writelines( 55 [ 56 self.header, 57 f"Host {self.host}\n", 58 f" IdentityFile {key_path}\n", 59 " StrictHostKeyChecking No\n", 60 " UserKnownHostsFile /dev/null\n", 61 f"{FOOTER}\n", 62 "\n", 63 ] 64 ) 65 return True 66 67 def write_key(self): 68 """Write keypair's private key to a temporary file""" 69 path = Path(gettempdir(), f"{self.keypair.pk}_private.pem") 70 with open(path, "w", encoding="utf8", opener=opener) as _file: 71 _file.write(self.keypair.key_data) 72 return str(path) 73 74 def write(self): 75 """Write keyfile and update ssh config""" 76 self.key_path = self.write_key() 77 was_written = self.write_config(self.key_path) 78 if not was_written: 79 self.cleanup() 80 81 def cleanup(self): 82 """Cleanup when we're done""" 83 try: 84 os.unlink(self.key_path) 85 with open(self.config_path, encoding="utf-8") as ssh_config: 86 start = 0 87 end = 0 88 lines = ssh_config.readlines() 89 for idx, line in enumerate(lines): 90 if line == self.header: 91 start = idx 92 if start != 0 and line == f"{FOOTER}\n": 93 end = idx 94 with open(self.config_path, "w+", encoding="utf-8") as ssh_config: 95 lines = lines[:start] + lines[end + 2 :] 96 ssh_config.writelines(lines) 97 except OSError: 98 # If we fail deleting a file it doesn't matter that much 99 # since we're just in a container 100 pass
Create paramiko ssh config from CertificateKeyPair
DockerInlineSSH(host: str, keypair: authentik.crypto.models.CertificateKeyPair)
35 def __init__(self, host: str, keypair: CertificateKeyPair) -> None: 36 self.host = host 37 self.keypair = keypair 38 self.config_path = Path("~/.ssh/config").expanduser() 39 if self.config_path.exists() and HEADER not in self.config_path.read_text(encoding="utf-8"): 40 # SSH Config file already exists and there's no header from us, meaning that it's 41 # been externally mapped into the container for more complex configs 42 raise SSHManagedExternallyException( 43 "SSH Config exists and does not contain authentik header" 44 ) 45 if not self.keypair: 46 raise DockerException("keypair must be set for SSH connections") 47 self.header = f"{HEADER} - {self.host}\n"
def
write_config(self, key_path: str) -> bool:
49 def write_config(self, key_path: str) -> bool: 50 """Update the local user's ssh config file""" 51 with open(self.config_path, "a+", encoding="utf-8") as ssh_config: 52 if self.header in ssh_config.readlines(): 53 return False 54 ssh_config.writelines( 55 [ 56 self.header, 57 f"Host {self.host}\n", 58 f" IdentityFile {key_path}\n", 59 " StrictHostKeyChecking No\n", 60 " UserKnownHostsFile /dev/null\n", 61 f"{FOOTER}\n", 62 "\n", 63 ] 64 ) 65 return True
Update the local user's ssh config file
def
write_key(self):
67 def write_key(self): 68 """Write keypair's private key to a temporary file""" 69 path = Path(gettempdir(), f"{self.keypair.pk}_private.pem") 70 with open(path, "w", encoding="utf8", opener=opener) as _file: 71 _file.write(self.keypair.key_data) 72 return str(path)
Write keypair's private key to a temporary file
def
write(self):
74 def write(self): 75 """Write keyfile and update ssh config""" 76 self.key_path = self.write_key() 77 was_written = self.write_config(self.key_path) 78 if not was_written: 79 self.cleanup()
Write keyfile and update ssh config
def
cleanup(self):
81 def cleanup(self): 82 """Cleanup when we're done""" 83 try: 84 os.unlink(self.key_path) 85 with open(self.config_path, encoding="utf-8") as ssh_config: 86 start = 0 87 end = 0 88 lines = ssh_config.readlines() 89 for idx, line in enumerate(lines): 90 if line == self.header: 91 start = idx 92 if start != 0 and line == f"{FOOTER}\n": 93 end = idx 94 with open(self.config_path, "w+", encoding="utf-8") as ssh_config: 95 lines = lines[:start] + lines[end + 2 :] 96 ssh_config.writelines(lines) 97 except OSError: 98 # If we fail deleting a file it doesn't matter that much 99 # since we're just in a container 100 pass
Cleanup when we're done