authentik.admin.files.tests.utils

  1import shutil
  2import socket
  3from tempfile import mkdtemp
  4from urllib.parse import urlparse
  5
  6from authentik.admin.files.backends.s3 import S3Backend
  7from authentik.admin.files.usage import FileUsage
  8from authentik.lib.config import CONFIG, UNSET
  9from authentik.lib.generators import generate_id
 10
 11S3_TEST_ENDPOINT = "http://localhost:8020"
 12
 13
 14def s3_test_server_available() -> bool:
 15    """Check if the S3 test server is reachable."""
 16
 17    parsed = urlparse(S3_TEST_ENDPOINT)
 18    try:
 19        with socket.create_connection((parsed.hostname, parsed.port), timeout=2):
 20            return True
 21    except OSError:
 22        return False
 23
 24
 25class FileTestFileBackendMixin:
 26    def setUp(self):
 27        self.original_media_backend = CONFIG.get("storage.media.backend", UNSET)
 28        self.original_media_backend_path = CONFIG.get("storage.media.file.path", UNSET)
 29        self.media_backend_path = mkdtemp()
 30        CONFIG.set("storage.media.backend", "file")
 31        CONFIG.set("storage.media.file.path", str(self.media_backend_path))
 32
 33        self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET)
 34        self.original_reports_backend_path = CONFIG.get("storage.reports.file.path", UNSET)
 35        self.reports_backend_path = mkdtemp()
 36        CONFIG.set("storage.reports.backend", "file")
 37        CONFIG.set("storage.reports.file.path", str(self.reports_backend_path))
 38
 39    def tearDown(self):
 40        if self.original_media_backend is not UNSET:
 41            CONFIG.set("storage.media.backend", self.original_media_backend)
 42        else:
 43            CONFIG.delete("storage.media.backend")
 44        if self.original_media_backend_path is not UNSET:
 45            CONFIG.set("storage.media.file.path", self.original_media_backend_path)
 46        else:
 47            CONFIG.delete("storage.media.file.path")
 48        shutil.rmtree(self.media_backend_path)
 49
 50        if self.original_reports_backend is not UNSET:
 51            CONFIG.set("storage.reports.backend", self.original_reports_backend)
 52        else:
 53            CONFIG.delete("storage.reports.backend")
 54        if self.original_reports_backend_path is not UNSET:
 55            CONFIG.set("storage.reports.file.path", self.original_reports_backend_path)
 56        else:
 57            CONFIG.delete("storage.reports.file.path")
 58        shutil.rmtree(self.reports_backend_path)
 59
 60
 61class FileTestS3BackendMixin:
 62    def setUp(self):
 63        s3_config_keys = {
 64            "endpoint",
 65            "access_key",
 66            "secret_key",
 67            "bucket_name",
 68        }
 69        self.original_media_backend = CONFIG.get("storage.media.backend", UNSET)
 70        CONFIG.set("storage.media.backend", "s3")
 71        self.original_media_s3_settings = {}
 72        for key in s3_config_keys:
 73            self.original_media_s3_settings[key] = CONFIG.get(f"storage.media.s3.{key}", UNSET)
 74        self.media_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
 75        CONFIG.set("storage.media.s3.endpoint", S3_TEST_ENDPOINT)
 76        CONFIG.set("storage.media.s3.access_key", "accessKey1")
 77        CONFIG.set("storage.media.s3.secret_key", "secretKey1")
 78        CONFIG.set("storage.media.s3.bucket_name", self.media_s3_bucket_name)
 79        self.media_s3_backend = S3Backend(FileUsage.MEDIA)
 80        self.media_s3_backend.client.create_bucket(Bucket=self.media_s3_bucket_name, ACL="private")
 81
 82        self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET)
 83        CONFIG.set("storage.reports.backend", "s3")
 84        self.original_reports_s3_settings = {}
 85        for key in s3_config_keys:
 86            self.original_reports_s3_settings[key] = CONFIG.get(f"storage.reports.s3.{key}", UNSET)
 87        self.reports_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
 88        CONFIG.set("storage.reports.s3.endpoint", S3_TEST_ENDPOINT)
 89        CONFIG.set("storage.reports.s3.access_key", "accessKey1")
 90        CONFIG.set("storage.reports.s3.secret_key", "secretKey1")
 91        CONFIG.set("storage.reports.s3.bucket_name", self.reports_s3_bucket_name)
 92        self.reports_s3_backend = S3Backend(FileUsage.REPORTS)
 93        self.reports_s3_backend.client.create_bucket(
 94            Bucket=self.reports_s3_bucket_name, ACL="private"
 95        )
 96
 97    def tearDown(self):
 98        def delete_objects_in_bucket(client, bucket_name):
 99            paginator = client.get_paginator("list_objects_v2")
100            pages = paginator.paginate(Bucket=bucket_name)
101            for page in pages:
102                if "Contents" not in page:
103                    continue
104                for obj in page["Contents"]:
105                    client.delete_object(Bucket=bucket_name, Key=obj["Key"])
106
107        delete_objects_in_bucket(self.media_s3_backend.client, self.media_s3_bucket_name)
108        self.media_s3_backend.client.delete_bucket(Bucket=self.media_s3_bucket_name)
109        if self.original_media_backend is not UNSET:
110            CONFIG.set("storage.media.backend", self.original_media_backend)
111        else:
112            CONFIG.delete("storage.media.backend")
113        for k, v in self.original_media_s3_settings.items():
114            if v is not UNSET:
115                CONFIG.set(f"storage.media.s3.{k}", v)
116            else:
117                CONFIG.delete(f"storage.media.s3.{k}")
118
119        delete_objects_in_bucket(self.reports_s3_backend.client, self.reports_s3_bucket_name)
120        self.reports_s3_backend.client.delete_bucket(Bucket=self.reports_s3_bucket_name)
121        if self.original_reports_backend is not UNSET:
122            CONFIG.set("storage.reports.backend", self.original_reports_backend)
123        else:
124            CONFIG.delete("storage.reports.backend")
125        for k, v in self.original_reports_s3_settings.items():
126            if v is not UNSET:
127                CONFIG.set(f"storage.reports.s3.{k}", v)
128            else:
129                CONFIG.delete(f"storage.reports.s3.{k}")
S3_TEST_ENDPOINT = 'http://localhost:8020'
def s3_test_server_available() -> bool:
15def s3_test_server_available() -> bool:
16    """Check if the S3 test server is reachable."""
17
18    parsed = urlparse(S3_TEST_ENDPOINT)
19    try:
20        with socket.create_connection((parsed.hostname, parsed.port), timeout=2):
21            return True
22    except OSError:
23        return False

Check if the S3 test server is reachable.

class FileTestFileBackendMixin:
26class FileTestFileBackendMixin:
27    def setUp(self):
28        self.original_media_backend = CONFIG.get("storage.media.backend", UNSET)
29        self.original_media_backend_path = CONFIG.get("storage.media.file.path", UNSET)
30        self.media_backend_path = mkdtemp()
31        CONFIG.set("storage.media.backend", "file")
32        CONFIG.set("storage.media.file.path", str(self.media_backend_path))
33
34        self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET)
35        self.original_reports_backend_path = CONFIG.get("storage.reports.file.path", UNSET)
36        self.reports_backend_path = mkdtemp()
37        CONFIG.set("storage.reports.backend", "file")
38        CONFIG.set("storage.reports.file.path", str(self.reports_backend_path))
39
40    def tearDown(self):
41        if self.original_media_backend is not UNSET:
42            CONFIG.set("storage.media.backend", self.original_media_backend)
43        else:
44            CONFIG.delete("storage.media.backend")
45        if self.original_media_backend_path is not UNSET:
46            CONFIG.set("storage.media.file.path", self.original_media_backend_path)
47        else:
48            CONFIG.delete("storage.media.file.path")
49        shutil.rmtree(self.media_backend_path)
50
51        if self.original_reports_backend is not UNSET:
52            CONFIG.set("storage.reports.backend", self.original_reports_backend)
53        else:
54            CONFIG.delete("storage.reports.backend")
55        if self.original_reports_backend_path is not UNSET:
56            CONFIG.set("storage.reports.file.path", self.original_reports_backend_path)
57        else:
58            CONFIG.delete("storage.reports.file.path")
59        shutil.rmtree(self.reports_backend_path)
def setUp(self):
27    def setUp(self):
28        self.original_media_backend = CONFIG.get("storage.media.backend", UNSET)
29        self.original_media_backend_path = CONFIG.get("storage.media.file.path", UNSET)
30        self.media_backend_path = mkdtemp()
31        CONFIG.set("storage.media.backend", "file")
32        CONFIG.set("storage.media.file.path", str(self.media_backend_path))
33
34        self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET)
35        self.original_reports_backend_path = CONFIG.get("storage.reports.file.path", UNSET)
36        self.reports_backend_path = mkdtemp()
37        CONFIG.set("storage.reports.backend", "file")
38        CONFIG.set("storage.reports.file.path", str(self.reports_backend_path))
def tearDown(self):
40    def tearDown(self):
41        if self.original_media_backend is not UNSET:
42            CONFIG.set("storage.media.backend", self.original_media_backend)
43        else:
44            CONFIG.delete("storage.media.backend")
45        if self.original_media_backend_path is not UNSET:
46            CONFIG.set("storage.media.file.path", self.original_media_backend_path)
47        else:
48            CONFIG.delete("storage.media.file.path")
49        shutil.rmtree(self.media_backend_path)
50
51        if self.original_reports_backend is not UNSET:
52            CONFIG.set("storage.reports.backend", self.original_reports_backend)
53        else:
54            CONFIG.delete("storage.reports.backend")
55        if self.original_reports_backend_path is not UNSET:
56            CONFIG.set("storage.reports.file.path", self.original_reports_backend_path)
57        else:
58            CONFIG.delete("storage.reports.file.path")
59        shutil.rmtree(self.reports_backend_path)
class FileTestS3BackendMixin:
 62class FileTestS3BackendMixin:
 63    def setUp(self):
 64        s3_config_keys = {
 65            "endpoint",
 66            "access_key",
 67            "secret_key",
 68            "bucket_name",
 69        }
 70        self.original_media_backend = CONFIG.get("storage.media.backend", UNSET)
 71        CONFIG.set("storage.media.backend", "s3")
 72        self.original_media_s3_settings = {}
 73        for key in s3_config_keys:
 74            self.original_media_s3_settings[key] = CONFIG.get(f"storage.media.s3.{key}", UNSET)
 75        self.media_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
 76        CONFIG.set("storage.media.s3.endpoint", S3_TEST_ENDPOINT)
 77        CONFIG.set("storage.media.s3.access_key", "accessKey1")
 78        CONFIG.set("storage.media.s3.secret_key", "secretKey1")
 79        CONFIG.set("storage.media.s3.bucket_name", self.media_s3_bucket_name)
 80        self.media_s3_backend = S3Backend(FileUsage.MEDIA)
 81        self.media_s3_backend.client.create_bucket(Bucket=self.media_s3_bucket_name, ACL="private")
 82
 83        self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET)
 84        CONFIG.set("storage.reports.backend", "s3")
 85        self.original_reports_s3_settings = {}
 86        for key in s3_config_keys:
 87            self.original_reports_s3_settings[key] = CONFIG.get(f"storage.reports.s3.{key}", UNSET)
 88        self.reports_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
 89        CONFIG.set("storage.reports.s3.endpoint", S3_TEST_ENDPOINT)
 90        CONFIG.set("storage.reports.s3.access_key", "accessKey1")
 91        CONFIG.set("storage.reports.s3.secret_key", "secretKey1")
 92        CONFIG.set("storage.reports.s3.bucket_name", self.reports_s3_bucket_name)
 93        self.reports_s3_backend = S3Backend(FileUsage.REPORTS)
 94        self.reports_s3_backend.client.create_bucket(
 95            Bucket=self.reports_s3_bucket_name, ACL="private"
 96        )
 97
 98    def tearDown(self):
 99        def delete_objects_in_bucket(client, bucket_name):
100            paginator = client.get_paginator("list_objects_v2")
101            pages = paginator.paginate(Bucket=bucket_name)
102            for page in pages:
103                if "Contents" not in page:
104                    continue
105                for obj in page["Contents"]:
106                    client.delete_object(Bucket=bucket_name, Key=obj["Key"])
107
108        delete_objects_in_bucket(self.media_s3_backend.client, self.media_s3_bucket_name)
109        self.media_s3_backend.client.delete_bucket(Bucket=self.media_s3_bucket_name)
110        if self.original_media_backend is not UNSET:
111            CONFIG.set("storage.media.backend", self.original_media_backend)
112        else:
113            CONFIG.delete("storage.media.backend")
114        for k, v in self.original_media_s3_settings.items():
115            if v is not UNSET:
116                CONFIG.set(f"storage.media.s3.{k}", v)
117            else:
118                CONFIG.delete(f"storage.media.s3.{k}")
119
120        delete_objects_in_bucket(self.reports_s3_backend.client, self.reports_s3_bucket_name)
121        self.reports_s3_backend.client.delete_bucket(Bucket=self.reports_s3_bucket_name)
122        if self.original_reports_backend is not UNSET:
123            CONFIG.set("storage.reports.backend", self.original_reports_backend)
124        else:
125            CONFIG.delete("storage.reports.backend")
126        for k, v in self.original_reports_s3_settings.items():
127            if v is not UNSET:
128                CONFIG.set(f"storage.reports.s3.{k}", v)
129            else:
130                CONFIG.delete(f"storage.reports.s3.{k}")
def setUp(self):
63    def setUp(self):
64        s3_config_keys = {
65            "endpoint",
66            "access_key",
67            "secret_key",
68            "bucket_name",
69        }
70        self.original_media_backend = CONFIG.get("storage.media.backend", UNSET)
71        CONFIG.set("storage.media.backend", "s3")
72        self.original_media_s3_settings = {}
73        for key in s3_config_keys:
74            self.original_media_s3_settings[key] = CONFIG.get(f"storage.media.s3.{key}", UNSET)
75        self.media_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
76        CONFIG.set("storage.media.s3.endpoint", S3_TEST_ENDPOINT)
77        CONFIG.set("storage.media.s3.access_key", "accessKey1")
78        CONFIG.set("storage.media.s3.secret_key", "secretKey1")
79        CONFIG.set("storage.media.s3.bucket_name", self.media_s3_bucket_name)
80        self.media_s3_backend = S3Backend(FileUsage.MEDIA)
81        self.media_s3_backend.client.create_bucket(Bucket=self.media_s3_bucket_name, ACL="private")
82
83        self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET)
84        CONFIG.set("storage.reports.backend", "s3")
85        self.original_reports_s3_settings = {}
86        for key in s3_config_keys:
87            self.original_reports_s3_settings[key] = CONFIG.get(f"storage.reports.s3.{key}", UNSET)
88        self.reports_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower()
89        CONFIG.set("storage.reports.s3.endpoint", S3_TEST_ENDPOINT)
90        CONFIG.set("storage.reports.s3.access_key", "accessKey1")
91        CONFIG.set("storage.reports.s3.secret_key", "secretKey1")
92        CONFIG.set("storage.reports.s3.bucket_name", self.reports_s3_bucket_name)
93        self.reports_s3_backend = S3Backend(FileUsage.REPORTS)
94        self.reports_s3_backend.client.create_bucket(
95            Bucket=self.reports_s3_bucket_name, ACL="private"
96        )
def tearDown(self):
 98    def tearDown(self):
 99        def delete_objects_in_bucket(client, bucket_name):
100            paginator = client.get_paginator("list_objects_v2")
101            pages = paginator.paginate(Bucket=bucket_name)
102            for page in pages:
103                if "Contents" not in page:
104                    continue
105                for obj in page["Contents"]:
106                    client.delete_object(Bucket=bucket_name, Key=obj["Key"])
107
108        delete_objects_in_bucket(self.media_s3_backend.client, self.media_s3_bucket_name)
109        self.media_s3_backend.client.delete_bucket(Bucket=self.media_s3_bucket_name)
110        if self.original_media_backend is not UNSET:
111            CONFIG.set("storage.media.backend", self.original_media_backend)
112        else:
113            CONFIG.delete("storage.media.backend")
114        for k, v in self.original_media_s3_settings.items():
115            if v is not UNSET:
116                CONFIG.set(f"storage.media.s3.{k}", v)
117            else:
118                CONFIG.delete(f"storage.media.s3.{k}")
119
120        delete_objects_in_bucket(self.reports_s3_backend.client, self.reports_s3_bucket_name)
121        self.reports_s3_backend.client.delete_bucket(Bucket=self.reports_s3_bucket_name)
122        if self.original_reports_backend is not UNSET:
123            CONFIG.set("storage.reports.backend", self.original_reports_backend)
124        else:
125            CONFIG.delete("storage.reports.backend")
126        for k, v in self.original_reports_s3_settings.items():
127            if v is not UNSET:
128                CONFIG.set(f"storage.reports.s3.{k}", v)
129            else:
130                CONFIG.delete(f"storage.reports.s3.{k}")