authentik.outposts.docker_ssh

Docker SSH helper

 1"""Docker SSH helper"""
 2
 3import os
 4from pathlib import Path
 5from tempfile import gettempdir
 6
 7from docker.errors import DockerException
 8
 9from authentik.crypto.models import CertificateKeyPair
10
11HEADER = "### Managed by authentik"
12FOOTER = "### End Managed by authentik"
13
14
15def opener(path: Path | str, flags: int):
16    """File opener to create files as 600 perms"""
17    return os.open(path, flags, 0o600)
18
19
20class SSHManagedExternallyException(DockerException):
21    """Raised when the ssh config file is managed externally."""
22
23
24class DockerInlineSSH:
25    """Create paramiko ssh config from CertificateKeyPair"""
26
27    host: str
28    keypair: CertificateKeyPair
29
30    key_path: str
31    config_path: Path
32    header: str
33
34    def __init__(self, host: str, keypair: CertificateKeyPair) -> None:
35        self.host = host
36        self.keypair = keypair
37        self.config_path = Path("~/.ssh/config").expanduser()
38        if self.config_path.exists() and HEADER not in self.config_path.read_text(encoding="utf-8"):
39            # SSH Config file already exists and there's no header from us, meaning that it's
40            # been externally mapped into the container for more complex configs
41            raise SSHManagedExternallyException(
42                "SSH Config exists and does not contain authentik header"
43            )
44        if not self.keypair:
45            raise DockerException("keypair must be set for SSH connections")
46        self.header = f"{HEADER} - {self.host}\n"
47
48    def write_config(self, key_path: str) -> bool:
49        """Update the local user's ssh config file"""
50        with open(self.config_path, "a+", encoding="utf-8") as ssh_config:
51            if self.header in ssh_config.readlines():
52                return False
53            ssh_config.writelines(
54                [
55                    self.header,
56                    f"Host {self.host}\n",
57                    f"    IdentityFile {key_path}\n",
58                    "    StrictHostKeyChecking No\n",
59                    "    UserKnownHostsFile /dev/null\n",
60                    f"{FOOTER}\n",
61                    "\n",
62                ]
63            )
64        return True
65
66    def write_key(self):
67        """Write keypair's private key to a temporary file"""
68        path = Path(gettempdir(), f"{self.keypair.pk}_private.pem")
69        with open(path, "w", encoding="utf8", opener=opener) as _file:
70            _file.write(self.keypair.key_data)
71        return str(path)
72
73    def write(self):
74        """Write keyfile and update ssh config"""
75        self.key_path = self.write_key()
76        was_written = self.write_config(self.key_path)
77        if not was_written:
78            self.cleanup()
79
80    def cleanup(self):
81        """Cleanup when we're done"""
82        try:
83            os.unlink(self.key_path)
84            with open(self.config_path, encoding="utf-8") as ssh_config:
85                start = 0
86                end = 0
87                lines = ssh_config.readlines()
88                for idx, line in enumerate(lines):
89                    if line == self.header:
90                        start = idx
91                    if start != 0 and line == f"{FOOTER}\n":
92                        end = idx
93            with open(self.config_path, "w+", encoding="utf-8") as ssh_config:
94                lines = lines[:start] + lines[end + 2 :]
95                ssh_config.writelines(lines)
96        except OSError:
97            # If we fail deleting a file it doesn't matter that much
98            # since we're just in a container
99            pass
def opener(path: pathlib.Path | str, flags: int):
16def opener(path: Path | str, flags: int):
17    """File opener to create files as 600 perms"""
18    return os.open(path, flags, 0o600)

File opener to create files as 600 perms

class SSHManagedExternallyException(docker.errors.DockerException):
21class SSHManagedExternallyException(DockerException):
22    """Raised when the ssh config file is managed externally."""

Raised when the ssh config file is managed externally.

class DockerInlineSSH:
 25class DockerInlineSSH:
 26    """Create paramiko ssh config from CertificateKeyPair"""
 27
 28    host: str
 29    keypair: CertificateKeyPair
 30
 31    key_path: str
 32    config_path: Path
 33    header: str
 34
 35    def __init__(self, host: str, keypair: CertificateKeyPair) -> None:
 36        self.host = host
 37        self.keypair = keypair
 38        self.config_path = Path("~/.ssh/config").expanduser()
 39        if self.config_path.exists() and HEADER not in self.config_path.read_text(encoding="utf-8"):
 40            # SSH Config file already exists and there's no header from us, meaning that it's
 41            # been externally mapped into the container for more complex configs
 42            raise SSHManagedExternallyException(
 43                "SSH Config exists and does not contain authentik header"
 44            )
 45        if not self.keypair:
 46            raise DockerException("keypair must be set for SSH connections")
 47        self.header = f"{HEADER} - {self.host}\n"
 48
 49    def write_config(self, key_path: str) -> bool:
 50        """Update the local user's ssh config file"""
 51        with open(self.config_path, "a+", encoding="utf-8") as ssh_config:
 52            if self.header in ssh_config.readlines():
 53                return False
 54            ssh_config.writelines(
 55                [
 56                    self.header,
 57                    f"Host {self.host}\n",
 58                    f"    IdentityFile {key_path}\n",
 59                    "    StrictHostKeyChecking No\n",
 60                    "    UserKnownHostsFile /dev/null\n",
 61                    f"{FOOTER}\n",
 62                    "\n",
 63                ]
 64            )
 65        return True
 66
 67    def write_key(self):
 68        """Write keypair's private key to a temporary file"""
 69        path = Path(gettempdir(), f"{self.keypair.pk}_private.pem")
 70        with open(path, "w", encoding="utf8", opener=opener) as _file:
 71            _file.write(self.keypair.key_data)
 72        return str(path)
 73
 74    def write(self):
 75        """Write keyfile and update ssh config"""
 76        self.key_path = self.write_key()
 77        was_written = self.write_config(self.key_path)
 78        if not was_written:
 79            self.cleanup()
 80
 81    def cleanup(self):
 82        """Cleanup when we're done"""
 83        try:
 84            os.unlink(self.key_path)
 85            with open(self.config_path, encoding="utf-8") as ssh_config:
 86                start = 0
 87                end = 0
 88                lines = ssh_config.readlines()
 89                for idx, line in enumerate(lines):
 90                    if line == self.header:
 91                        start = idx
 92                    if start != 0 and line == f"{FOOTER}\n":
 93                        end = idx
 94            with open(self.config_path, "w+", encoding="utf-8") as ssh_config:
 95                lines = lines[:start] + lines[end + 2 :]
 96                ssh_config.writelines(lines)
 97        except OSError:
 98            # If we fail deleting a file it doesn't matter that much
 99            # since we're just in a container
100            pass

Create paramiko ssh config from CertificateKeyPair

DockerInlineSSH(host: str, keypair: authentik.crypto.models.CertificateKeyPair)
35    def __init__(self, host: str, keypair: CertificateKeyPair) -> None:
36        self.host = host
37        self.keypair = keypair
38        self.config_path = Path("~/.ssh/config").expanduser()
39        if self.config_path.exists() and HEADER not in self.config_path.read_text(encoding="utf-8"):
40            # SSH Config file already exists and there's no header from us, meaning that it's
41            # been externally mapped into the container for more complex configs
42            raise SSHManagedExternallyException(
43                "SSH Config exists and does not contain authentik header"
44            )
45        if not self.keypair:
46            raise DockerException("keypair must be set for SSH connections")
47        self.header = f"{HEADER} - {self.host}\n"
host: str
key_path: str
config_path: pathlib.Path
header: str
def write_config(self, key_path: str) -> bool:
49    def write_config(self, key_path: str) -> bool:
50        """Update the local user's ssh config file"""
51        with open(self.config_path, "a+", encoding="utf-8") as ssh_config:
52            if self.header in ssh_config.readlines():
53                return False
54            ssh_config.writelines(
55                [
56                    self.header,
57                    f"Host {self.host}\n",
58                    f"    IdentityFile {key_path}\n",
59                    "    StrictHostKeyChecking No\n",
60                    "    UserKnownHostsFile /dev/null\n",
61                    f"{FOOTER}\n",
62                    "\n",
63                ]
64            )
65        return True

Update the local user's ssh config file

def write_key(self):
67    def write_key(self):
68        """Write keypair's private key to a temporary file"""
69        path = Path(gettempdir(), f"{self.keypair.pk}_private.pem")
70        with open(path, "w", encoding="utf8", opener=opener) as _file:
71            _file.write(self.keypair.key_data)
72        return str(path)

Write keypair's private key to a temporary file

def write(self):
74    def write(self):
75        """Write keyfile and update ssh config"""
76        self.key_path = self.write_key()
77        was_written = self.write_config(self.key_path)
78        if not was_written:
79            self.cleanup()

Write keyfile and update ssh config

def cleanup(self):
 81    def cleanup(self):
 82        """Cleanup when we're done"""
 83        try:
 84            os.unlink(self.key_path)
 85            with open(self.config_path, encoding="utf-8") as ssh_config:
 86                start = 0
 87                end = 0
 88                lines = ssh_config.readlines()
 89                for idx, line in enumerate(lines):
 90                    if line == self.header:
 91                        start = idx
 92                    if start != 0 and line == f"{FOOTER}\n":
 93                        end = idx
 94            with open(self.config_path, "w+", encoding="utf-8") as ssh_config:
 95                lines = lines[:start] + lines[end + 2 :]
 96                ssh_config.writelines(lines)
 97        except OSError:
 98            # If we fail deleting a file it doesn't matter that much
 99            # since we're just in a container
100            pass

Cleanup when we're done