authentik.admin.files.tests.utils
1import shutil 2import socket 3from tempfile import mkdtemp 4from urllib.parse import urlparse 5 6from authentik.admin.files.backends.s3 import S3Backend 7from authentik.admin.files.usage import FileUsage 8from authentik.lib.config import CONFIG, UNSET 9from authentik.lib.generators import generate_id 10 11S3_TEST_ENDPOINT = "http://localhost:8020" 12 13 14def s3_test_server_available() -> bool: 15 """Check if the S3 test server is reachable.""" 16 17 parsed = urlparse(S3_TEST_ENDPOINT) 18 try: 19 with socket.create_connection((parsed.hostname, parsed.port), timeout=2): 20 return True 21 except OSError: 22 return False 23 24 25class FileTestFileBackendMixin: 26 def setUp(self): 27 self.original_media_backend = CONFIG.get("storage.media.backend", UNSET) 28 self.original_media_backend_path = CONFIG.get("storage.media.file.path", UNSET) 29 self.media_backend_path = mkdtemp() 30 CONFIG.set("storage.media.backend", "file") 31 CONFIG.set("storage.media.file.path", str(self.media_backend_path)) 32 33 self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET) 34 self.original_reports_backend_path = CONFIG.get("storage.reports.file.path", UNSET) 35 self.reports_backend_path = mkdtemp() 36 CONFIG.set("storage.reports.backend", "file") 37 CONFIG.set("storage.reports.file.path", str(self.reports_backend_path)) 38 39 def tearDown(self): 40 if self.original_media_backend is not UNSET: 41 CONFIG.set("storage.media.backend", self.original_media_backend) 42 else: 43 CONFIG.delete("storage.media.backend") 44 if self.original_media_backend_path is not UNSET: 45 CONFIG.set("storage.media.file.path", self.original_media_backend_path) 46 else: 47 CONFIG.delete("storage.media.file.path") 48 shutil.rmtree(self.media_backend_path) 49 50 if self.original_reports_backend is not UNSET: 51 CONFIG.set("storage.reports.backend", self.original_reports_backend) 52 else: 53 CONFIG.delete("storage.reports.backend") 54 if self.original_reports_backend_path is not UNSET: 55 CONFIG.set("storage.reports.file.path", self.original_reports_backend_path) 56 else: 57 CONFIG.delete("storage.reports.file.path") 58 shutil.rmtree(self.reports_backend_path) 59 60 61class FileTestS3BackendMixin: 62 def setUp(self): 63 s3_config_keys = { 64 "endpoint", 65 "access_key", 66 "secret_key", 67 "bucket_name", 68 } 69 self.original_media_backend = CONFIG.get("storage.media.backend", UNSET) 70 CONFIG.set("storage.media.backend", "s3") 71 self.original_media_s3_settings = {} 72 for key in s3_config_keys: 73 self.original_media_s3_settings[key] = CONFIG.get(f"storage.media.s3.{key}", UNSET) 74 self.media_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower() 75 CONFIG.set("storage.media.s3.endpoint", S3_TEST_ENDPOINT) 76 CONFIG.set("storage.media.s3.access_key", "accessKey1") 77 CONFIG.set("storage.media.s3.secret_key", "secretKey1") 78 CONFIG.set("storage.media.s3.bucket_name", self.media_s3_bucket_name) 79 self.media_s3_backend = S3Backend(FileUsage.MEDIA) 80 self.media_s3_backend.client.create_bucket(Bucket=self.media_s3_bucket_name, ACL="private") 81 82 self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET) 83 CONFIG.set("storage.reports.backend", "s3") 84 self.original_reports_s3_settings = {} 85 for key in s3_config_keys: 86 self.original_reports_s3_settings[key] = CONFIG.get(f"storage.reports.s3.{key}", UNSET) 87 self.reports_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower() 88 CONFIG.set("storage.reports.s3.endpoint", S3_TEST_ENDPOINT) 89 CONFIG.set("storage.reports.s3.access_key", "accessKey1") 90 CONFIG.set("storage.reports.s3.secret_key", "secretKey1") 91 CONFIG.set("storage.reports.s3.bucket_name", self.reports_s3_bucket_name) 92 self.reports_s3_backend = S3Backend(FileUsage.REPORTS) 93 self.reports_s3_backend.client.create_bucket( 94 Bucket=self.reports_s3_bucket_name, ACL="private" 95 ) 96 97 def tearDown(self): 98 def delete_objects_in_bucket(client, bucket_name): 99 paginator = client.get_paginator("list_objects_v2") 100 pages = paginator.paginate(Bucket=bucket_name) 101 for page in pages: 102 if "Contents" not in page: 103 continue 104 for obj in page["Contents"]: 105 client.delete_object(Bucket=bucket_name, Key=obj["Key"]) 106 107 delete_objects_in_bucket(self.media_s3_backend.client, self.media_s3_bucket_name) 108 self.media_s3_backend.client.delete_bucket(Bucket=self.media_s3_bucket_name) 109 if self.original_media_backend is not UNSET: 110 CONFIG.set("storage.media.backend", self.original_media_backend) 111 else: 112 CONFIG.delete("storage.media.backend") 113 for k, v in self.original_media_s3_settings.items(): 114 if v is not UNSET: 115 CONFIG.set(f"storage.media.s3.{k}", v) 116 else: 117 CONFIG.delete(f"storage.media.s3.{k}") 118 119 delete_objects_in_bucket(self.reports_s3_backend.client, self.reports_s3_bucket_name) 120 self.reports_s3_backend.client.delete_bucket(Bucket=self.reports_s3_bucket_name) 121 if self.original_reports_backend is not UNSET: 122 CONFIG.set("storage.reports.backend", self.original_reports_backend) 123 else: 124 CONFIG.delete("storage.reports.backend") 125 for k, v in self.original_reports_s3_settings.items(): 126 if v is not UNSET: 127 CONFIG.set(f"storage.reports.s3.{k}", v) 128 else: 129 CONFIG.delete(f"storage.reports.s3.{k}")
S3_TEST_ENDPOINT =
'http://localhost:8020'
def
s3_test_server_available() -> bool:
15def s3_test_server_available() -> bool: 16 """Check if the S3 test server is reachable.""" 17 18 parsed = urlparse(S3_TEST_ENDPOINT) 19 try: 20 with socket.create_connection((parsed.hostname, parsed.port), timeout=2): 21 return True 22 except OSError: 23 return False
Check if the S3 test server is reachable.
class
FileTestFileBackendMixin:
26class FileTestFileBackendMixin: 27 def setUp(self): 28 self.original_media_backend = CONFIG.get("storage.media.backend", UNSET) 29 self.original_media_backend_path = CONFIG.get("storage.media.file.path", UNSET) 30 self.media_backend_path = mkdtemp() 31 CONFIG.set("storage.media.backend", "file") 32 CONFIG.set("storage.media.file.path", str(self.media_backend_path)) 33 34 self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET) 35 self.original_reports_backend_path = CONFIG.get("storage.reports.file.path", UNSET) 36 self.reports_backend_path = mkdtemp() 37 CONFIG.set("storage.reports.backend", "file") 38 CONFIG.set("storage.reports.file.path", str(self.reports_backend_path)) 39 40 def tearDown(self): 41 if self.original_media_backend is not UNSET: 42 CONFIG.set("storage.media.backend", self.original_media_backend) 43 else: 44 CONFIG.delete("storage.media.backend") 45 if self.original_media_backend_path is not UNSET: 46 CONFIG.set("storage.media.file.path", self.original_media_backend_path) 47 else: 48 CONFIG.delete("storage.media.file.path") 49 shutil.rmtree(self.media_backend_path) 50 51 if self.original_reports_backend is not UNSET: 52 CONFIG.set("storage.reports.backend", self.original_reports_backend) 53 else: 54 CONFIG.delete("storage.reports.backend") 55 if self.original_reports_backend_path is not UNSET: 56 CONFIG.set("storage.reports.file.path", self.original_reports_backend_path) 57 else: 58 CONFIG.delete("storage.reports.file.path") 59 shutil.rmtree(self.reports_backend_path)
def
setUp(self):
27 def setUp(self): 28 self.original_media_backend = CONFIG.get("storage.media.backend", UNSET) 29 self.original_media_backend_path = CONFIG.get("storage.media.file.path", UNSET) 30 self.media_backend_path = mkdtemp() 31 CONFIG.set("storage.media.backend", "file") 32 CONFIG.set("storage.media.file.path", str(self.media_backend_path)) 33 34 self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET) 35 self.original_reports_backend_path = CONFIG.get("storage.reports.file.path", UNSET) 36 self.reports_backend_path = mkdtemp() 37 CONFIG.set("storage.reports.backend", "file") 38 CONFIG.set("storage.reports.file.path", str(self.reports_backend_path))
def
tearDown(self):
40 def tearDown(self): 41 if self.original_media_backend is not UNSET: 42 CONFIG.set("storage.media.backend", self.original_media_backend) 43 else: 44 CONFIG.delete("storage.media.backend") 45 if self.original_media_backend_path is not UNSET: 46 CONFIG.set("storage.media.file.path", self.original_media_backend_path) 47 else: 48 CONFIG.delete("storage.media.file.path") 49 shutil.rmtree(self.media_backend_path) 50 51 if self.original_reports_backend is not UNSET: 52 CONFIG.set("storage.reports.backend", self.original_reports_backend) 53 else: 54 CONFIG.delete("storage.reports.backend") 55 if self.original_reports_backend_path is not UNSET: 56 CONFIG.set("storage.reports.file.path", self.original_reports_backend_path) 57 else: 58 CONFIG.delete("storage.reports.file.path") 59 shutil.rmtree(self.reports_backend_path)
class
FileTestS3BackendMixin:
62class FileTestS3BackendMixin: 63 def setUp(self): 64 s3_config_keys = { 65 "endpoint", 66 "access_key", 67 "secret_key", 68 "bucket_name", 69 } 70 self.original_media_backend = CONFIG.get("storage.media.backend", UNSET) 71 CONFIG.set("storage.media.backend", "s3") 72 self.original_media_s3_settings = {} 73 for key in s3_config_keys: 74 self.original_media_s3_settings[key] = CONFIG.get(f"storage.media.s3.{key}", UNSET) 75 self.media_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower() 76 CONFIG.set("storage.media.s3.endpoint", S3_TEST_ENDPOINT) 77 CONFIG.set("storage.media.s3.access_key", "accessKey1") 78 CONFIG.set("storage.media.s3.secret_key", "secretKey1") 79 CONFIG.set("storage.media.s3.bucket_name", self.media_s3_bucket_name) 80 self.media_s3_backend = S3Backend(FileUsage.MEDIA) 81 self.media_s3_backend.client.create_bucket(Bucket=self.media_s3_bucket_name, ACL="private") 82 83 self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET) 84 CONFIG.set("storage.reports.backend", "s3") 85 self.original_reports_s3_settings = {} 86 for key in s3_config_keys: 87 self.original_reports_s3_settings[key] = CONFIG.get(f"storage.reports.s3.{key}", UNSET) 88 self.reports_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower() 89 CONFIG.set("storage.reports.s3.endpoint", S3_TEST_ENDPOINT) 90 CONFIG.set("storage.reports.s3.access_key", "accessKey1") 91 CONFIG.set("storage.reports.s3.secret_key", "secretKey1") 92 CONFIG.set("storage.reports.s3.bucket_name", self.reports_s3_bucket_name) 93 self.reports_s3_backend = S3Backend(FileUsage.REPORTS) 94 self.reports_s3_backend.client.create_bucket( 95 Bucket=self.reports_s3_bucket_name, ACL="private" 96 ) 97 98 def tearDown(self): 99 def delete_objects_in_bucket(client, bucket_name): 100 paginator = client.get_paginator("list_objects_v2") 101 pages = paginator.paginate(Bucket=bucket_name) 102 for page in pages: 103 if "Contents" not in page: 104 continue 105 for obj in page["Contents"]: 106 client.delete_object(Bucket=bucket_name, Key=obj["Key"]) 107 108 delete_objects_in_bucket(self.media_s3_backend.client, self.media_s3_bucket_name) 109 self.media_s3_backend.client.delete_bucket(Bucket=self.media_s3_bucket_name) 110 if self.original_media_backend is not UNSET: 111 CONFIG.set("storage.media.backend", self.original_media_backend) 112 else: 113 CONFIG.delete("storage.media.backend") 114 for k, v in self.original_media_s3_settings.items(): 115 if v is not UNSET: 116 CONFIG.set(f"storage.media.s3.{k}", v) 117 else: 118 CONFIG.delete(f"storage.media.s3.{k}") 119 120 delete_objects_in_bucket(self.reports_s3_backend.client, self.reports_s3_bucket_name) 121 self.reports_s3_backend.client.delete_bucket(Bucket=self.reports_s3_bucket_name) 122 if self.original_reports_backend is not UNSET: 123 CONFIG.set("storage.reports.backend", self.original_reports_backend) 124 else: 125 CONFIG.delete("storage.reports.backend") 126 for k, v in self.original_reports_s3_settings.items(): 127 if v is not UNSET: 128 CONFIG.set(f"storage.reports.s3.{k}", v) 129 else: 130 CONFIG.delete(f"storage.reports.s3.{k}")
def
setUp(self):
63 def setUp(self): 64 s3_config_keys = { 65 "endpoint", 66 "access_key", 67 "secret_key", 68 "bucket_name", 69 } 70 self.original_media_backend = CONFIG.get("storage.media.backend", UNSET) 71 CONFIG.set("storage.media.backend", "s3") 72 self.original_media_s3_settings = {} 73 for key in s3_config_keys: 74 self.original_media_s3_settings[key] = CONFIG.get(f"storage.media.s3.{key}", UNSET) 75 self.media_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower() 76 CONFIG.set("storage.media.s3.endpoint", S3_TEST_ENDPOINT) 77 CONFIG.set("storage.media.s3.access_key", "accessKey1") 78 CONFIG.set("storage.media.s3.secret_key", "secretKey1") 79 CONFIG.set("storage.media.s3.bucket_name", self.media_s3_bucket_name) 80 self.media_s3_backend = S3Backend(FileUsage.MEDIA) 81 self.media_s3_backend.client.create_bucket(Bucket=self.media_s3_bucket_name, ACL="private") 82 83 self.original_reports_backend = CONFIG.get("storage.reports.backend", UNSET) 84 CONFIG.set("storage.reports.backend", "s3") 85 self.original_reports_s3_settings = {} 86 for key in s3_config_keys: 87 self.original_reports_s3_settings[key] = CONFIG.get(f"storage.reports.s3.{key}", UNSET) 88 self.reports_s3_bucket_name = f"authentik-test-{generate_id(10)}".lower() 89 CONFIG.set("storage.reports.s3.endpoint", S3_TEST_ENDPOINT) 90 CONFIG.set("storage.reports.s3.access_key", "accessKey1") 91 CONFIG.set("storage.reports.s3.secret_key", "secretKey1") 92 CONFIG.set("storage.reports.s3.bucket_name", self.reports_s3_bucket_name) 93 self.reports_s3_backend = S3Backend(FileUsage.REPORTS) 94 self.reports_s3_backend.client.create_bucket( 95 Bucket=self.reports_s3_bucket_name, ACL="private" 96 )
def
tearDown(self):
98 def tearDown(self): 99 def delete_objects_in_bucket(client, bucket_name): 100 paginator = client.get_paginator("list_objects_v2") 101 pages = paginator.paginate(Bucket=bucket_name) 102 for page in pages: 103 if "Contents" not in page: 104 continue 105 for obj in page["Contents"]: 106 client.delete_object(Bucket=bucket_name, Key=obj["Key"]) 107 108 delete_objects_in_bucket(self.media_s3_backend.client, self.media_s3_bucket_name) 109 self.media_s3_backend.client.delete_bucket(Bucket=self.media_s3_bucket_name) 110 if self.original_media_backend is not UNSET: 111 CONFIG.set("storage.media.backend", self.original_media_backend) 112 else: 113 CONFIG.delete("storage.media.backend") 114 for k, v in self.original_media_s3_settings.items(): 115 if v is not UNSET: 116 CONFIG.set(f"storage.media.s3.{k}", v) 117 else: 118 CONFIG.delete(f"storage.media.s3.{k}") 119 120 delete_objects_in_bucket(self.reports_s3_backend.client, self.reports_s3_bucket_name) 121 self.reports_s3_backend.client.delete_bucket(Bucket=self.reports_s3_bucket_name) 122 if self.original_reports_backend is not UNSET: 123 CONFIG.set("storage.reports.backend", self.original_reports_backend) 124 else: 125 CONFIG.delete("storage.reports.backend") 126 for k, v in self.original_reports_s3_settings.items(): 127 if v is not UNSET: 128 CONFIG.set(f"storage.reports.s3.{k}", v) 129 else: 130 CONFIG.delete(f"storage.reports.s3.{k}")