authentik.admin.files.backends.s3
1from collections.abc import Generator, Iterator 2from contextlib import contextmanager 3from tempfile import SpooledTemporaryFile 4from urllib.parse import urlsplit, urlunsplit 5 6import boto3 7from botocore.config import Config 8from botocore.exceptions import ClientError 9from django.db import connection 10from django.http.request import HttpRequest 11 12from authentik.admin.files.backends.base import ManageableBackend, get_content_type 13from authentik.admin.files.usage import FileUsage 14from authentik.lib.config import CONFIG 15from authentik.lib.utils.time import timedelta_from_string 16 17 18class S3Backend(ManageableBackend): 19 """S3-compatible object storage backend. 20 21 Stores files in s3-compatible storage: 22 - Key prefix: {usage}/{schema}/{filename} 23 - Supports full file management (upload, delete, list) 24 - Generates presigned URLs for file access 25 - Used when storage.backend=s3 26 """ 27 28 allowed_usages = list(FileUsage) # All usages 29 name = "s3" 30 31 def __init__(self, *args, **kwargs): 32 super().__init__(*args, **kwargs) 33 self._config = {} 34 self._session = None 35 36 def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]: 37 unset = object() 38 current = self._config.get(key, unset) 39 refreshed = CONFIG.refresh( 40 f"storage.{self.usage.value}.{self.name}.{key}", 41 CONFIG.refresh(f"storage.{self.name}.{key}", default), 42 ) 43 if current is unset: 44 current = refreshed 45 self._config[key] = refreshed 46 return (refreshed, current != refreshed) 47 48 @property 49 def base_path(self) -> str: 50 """S3 key prefix: {usage}/{schema}/""" 51 return f"{self.usage.value}/{connection.schema_name}" 52 53 @property 54 def bucket_name(self) -> str: 55 return CONFIG.get( 56 f"storage.{self.usage.value}.{self.name}.bucket_name", 57 CONFIG.get(f"storage.{self.name}.bucket_name"), 58 ) 59 60 @property 61 def session(self) -> boto3.Session: 62 """Create boto3 session with configured credentials.""" 63 session_profile, session_profile_r = self._get_config("session_profile", None) 64 if session_profile is not None: 65 if session_profile_r or self._session is None: 66 self._session = boto3.Session(profile_name=session_profile) 67 return self._session 68 else: 69 return self._session 70 else: 71 access_key, access_key_r = self._get_config("access_key", None) 72 secret_key, secret_key_r = self._get_config("secret_key", None) 73 session_token, session_token_r = self._get_config("session_token", None) 74 if access_key_r or secret_key_r or session_token_r or self._session is None: 75 self._session = boto3.Session( 76 aws_access_key_id=access_key, 77 aws_secret_access_key=secret_key, 78 aws_session_token=session_token, 79 ) 80 return self._session 81 else: 82 return self._session 83 84 @property 85 def client(self): 86 """Create S3 client with configured endpoint and region.""" 87 endpoint_url = CONFIG.get( 88 f"storage.{self.usage.value}.{self.name}.endpoint", 89 CONFIG.get(f"storage.{self.name}.endpoint", None), 90 ) 91 use_ssl = CONFIG.get( 92 f"storage.{self.usage.value}.{self.name}.use_ssl", 93 CONFIG.get(f"storage.{self.name}.use_ssl", True), 94 ) 95 region_name = CONFIG.get( 96 f"storage.{self.usage.value}.{self.name}.region", 97 CONFIG.get(f"storage.{self.name}.region", None), 98 ) 99 addressing_style = CONFIG.get( 100 f"storage.{self.usage.value}.{self.name}.addressing_style", 101 CONFIG.get(f"storage.{self.name}.addressing_style", "auto"), 102 ) 103 signature_version = CONFIG.get( 104 f"storage.{self.usage.value}.{self.name}.signature_version", 105 CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"), 106 ) 107 # Keep signature_version pass-through and let boto3/botocore handle it. 108 # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3` 109 # are the documented values: 110 # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407 111 # Botocore also supports additional signer names, so we intentionally do 112 # not enforce a restricted allowlist here. 113 114 return self.session.client( 115 "s3", 116 endpoint_url=endpoint_url, 117 use_ssl=use_ssl, 118 region_name=region_name, 119 config=Config( 120 signature_version=signature_version, s3={"addressing_style": addressing_style} 121 ), 122 ) 123 124 @property 125 def manageable(self) -> bool: 126 return True 127 128 def supports_file(self, name: str) -> bool: 129 """We support all files""" 130 return True 131 132 def list_files(self) -> Generator[str]: 133 """List all files returning relative paths from base_path.""" 134 paginator = self.client.get_paginator("list_objects_v2") 135 pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/") 136 137 for page in pages: 138 for obj in page.get("Contents", []): 139 key = obj["Key"] 140 # Remove base path prefix to get relative path 141 rel_path = key.removeprefix(f"{self.base_path}/") 142 if rel_path: # Skip if it's just the directory itself 143 yield rel_path 144 145 def file_url( 146 self, 147 name: str, 148 request: HttpRequest | None = None, 149 use_cache: bool = True, 150 ) -> str: 151 """Generate presigned URL for file access.""" 152 use_https = CONFIG.get_bool( 153 f"storage.{self.usage.value}.{self.name}.secure_urls", 154 CONFIG.get_bool(f"storage.{self.name}.secure_urls", True), 155 ) 156 157 expires_in = int( 158 timedelta_from_string( 159 CONFIG.get( 160 f"storage.{self.usage.value}.{self.name}.url_expiry", 161 CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"), 162 ) 163 ).total_seconds() 164 ) 165 166 def _file_url(name: str, request: HttpRequest | None) -> str: 167 client = self.client 168 params = { 169 "Bucket": self.bucket_name, 170 "Key": f"{self.base_path}/{name}", 171 } 172 173 operation_name = "GetObject" 174 operation_model = client.meta.service_model.operation_model(operation_name) 175 request_dict = client._convert_to_request_dict( 176 params, 177 operation_model, 178 endpoint_url=client.meta.endpoint_url, 179 context={"is_presign_request": True}, 180 ) 181 182 # Support custom domain for S3-compatible storage (so not AWS) 183 # Well, can't you do custom domains on AWS as well? 184 custom_domain = CONFIG.get( 185 f"storage.{self.usage.value}.{self.name}.custom_domain", 186 CONFIG.get(f"storage.{self.name}.custom_domain", None), 187 ) 188 if custom_domain: 189 scheme = "https" if use_https else "http" 190 path = request_dict["url_path"] 191 192 # When using path-style addressing, the presigned URL contains the bucket 193 # name in the path (e.g., /bucket-name/key). Since custom_domain must 194 # include the bucket name (per docs), strip it from the path to avoid 195 # duplication. See: https://github.com/goauthentik/authentik/issues/19521 196 # Check with trailing slash to ensure exact bucket name match 197 if path.startswith(f"/{self.bucket_name}/"): 198 path = path.removeprefix(f"/{self.bucket_name}") 199 200 # Normalize to avoid double slashes 201 custom_domain = custom_domain.rstrip("/") 202 if not path.startswith("/"): 203 path = f"/{path}" 204 205 custom_base = urlsplit(f"{scheme}://{custom_domain}") 206 207 # Sign the final public URL instead of signing the internal S3 endpoint and 208 # rewriting it afterwards. Presigned SigV4 URLs include the host header in the 209 # canonical request, so post-sign host changes break strict backends like RustFS. 210 public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path 211 request_dict["url_path"] = public_path 212 request_dict["url"] = urlunsplit( 213 (custom_base.scheme, custom_base.netloc, public_path, "", "") 214 ) 215 216 return client._request_signer.generate_presigned_url( 217 request_dict, 218 operation_name, 219 expires_in=expires_in, 220 ) 221 222 if use_cache: 223 return self._cache_get_or_set(name, request, _file_url, expires_in) 224 else: 225 return _file_url(name, request) 226 227 def save_file(self, name: str, content: bytes) -> None: 228 """Save file to S3.""" 229 self.client.put_object( 230 Bucket=self.bucket_name, 231 Key=f"{self.base_path}/{name}", 232 Body=content, 233 ACL="private", 234 ContentType=get_content_type(name), 235 ) 236 237 @contextmanager 238 def save_file_stream(self, name: str) -> Iterator: 239 """Context manager for streaming file writes to S3.""" 240 # Keep files in memory up to 5 MB 241 with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file: 242 yield file 243 file.seek(0) 244 self.client.upload_fileobj( 245 Fileobj=file, 246 Bucket=self.bucket_name, 247 Key=f"{self.base_path}/{name}", 248 ExtraArgs={ 249 "ACL": "private", 250 "ContentType": get_content_type(name), 251 }, 252 ) 253 254 def delete_file(self, name: str) -> None: 255 """Delete file from S3.""" 256 self.client.delete_object( 257 Bucket=self.bucket_name, 258 Key=f"{self.base_path}/{name}", 259 ) 260 261 def file_exists(self, name: str) -> bool: 262 """Check if a file exists in S3.""" 263 try: 264 self.client.head_object( 265 Bucket=self.bucket_name, 266 Key=f"{self.base_path}/{name}", 267 ) 268 return True 269 except ClientError: 270 return False
19class S3Backend(ManageableBackend): 20 """S3-compatible object storage backend. 21 22 Stores files in s3-compatible storage: 23 - Key prefix: {usage}/{schema}/{filename} 24 - Supports full file management (upload, delete, list) 25 - Generates presigned URLs for file access 26 - Used when storage.backend=s3 27 """ 28 29 allowed_usages = list(FileUsage) # All usages 30 name = "s3" 31 32 def __init__(self, *args, **kwargs): 33 super().__init__(*args, **kwargs) 34 self._config = {} 35 self._session = None 36 37 def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]: 38 unset = object() 39 current = self._config.get(key, unset) 40 refreshed = CONFIG.refresh( 41 f"storage.{self.usage.value}.{self.name}.{key}", 42 CONFIG.refresh(f"storage.{self.name}.{key}", default), 43 ) 44 if current is unset: 45 current = refreshed 46 self._config[key] = refreshed 47 return (refreshed, current != refreshed) 48 49 @property 50 def base_path(self) -> str: 51 """S3 key prefix: {usage}/{schema}/""" 52 return f"{self.usage.value}/{connection.schema_name}" 53 54 @property 55 def bucket_name(self) -> str: 56 return CONFIG.get( 57 f"storage.{self.usage.value}.{self.name}.bucket_name", 58 CONFIG.get(f"storage.{self.name}.bucket_name"), 59 ) 60 61 @property 62 def session(self) -> boto3.Session: 63 """Create boto3 session with configured credentials.""" 64 session_profile, session_profile_r = self._get_config("session_profile", None) 65 if session_profile is not None: 66 if session_profile_r or self._session is None: 67 self._session = boto3.Session(profile_name=session_profile) 68 return self._session 69 else: 70 return self._session 71 else: 72 access_key, access_key_r = self._get_config("access_key", None) 73 secret_key, secret_key_r = self._get_config("secret_key", None) 74 session_token, session_token_r = self._get_config("session_token", None) 75 if access_key_r or secret_key_r or session_token_r or self._session is None: 76 self._session = boto3.Session( 77 aws_access_key_id=access_key, 78 aws_secret_access_key=secret_key, 79 aws_session_token=session_token, 80 ) 81 return self._session 82 else: 83 return self._session 84 85 @property 86 def client(self): 87 """Create S3 client with configured endpoint and region.""" 88 endpoint_url = CONFIG.get( 89 f"storage.{self.usage.value}.{self.name}.endpoint", 90 CONFIG.get(f"storage.{self.name}.endpoint", None), 91 ) 92 use_ssl = CONFIG.get( 93 f"storage.{self.usage.value}.{self.name}.use_ssl", 94 CONFIG.get(f"storage.{self.name}.use_ssl", True), 95 ) 96 region_name = CONFIG.get( 97 f"storage.{self.usage.value}.{self.name}.region", 98 CONFIG.get(f"storage.{self.name}.region", None), 99 ) 100 addressing_style = CONFIG.get( 101 f"storage.{self.usage.value}.{self.name}.addressing_style", 102 CONFIG.get(f"storage.{self.name}.addressing_style", "auto"), 103 ) 104 signature_version = CONFIG.get( 105 f"storage.{self.usage.value}.{self.name}.signature_version", 106 CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"), 107 ) 108 # Keep signature_version pass-through and let boto3/botocore handle it. 109 # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3` 110 # are the documented values: 111 # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407 112 # Botocore also supports additional signer names, so we intentionally do 113 # not enforce a restricted allowlist here. 114 115 return self.session.client( 116 "s3", 117 endpoint_url=endpoint_url, 118 use_ssl=use_ssl, 119 region_name=region_name, 120 config=Config( 121 signature_version=signature_version, s3={"addressing_style": addressing_style} 122 ), 123 ) 124 125 @property 126 def manageable(self) -> bool: 127 return True 128 129 def supports_file(self, name: str) -> bool: 130 """We support all files""" 131 return True 132 133 def list_files(self) -> Generator[str]: 134 """List all files returning relative paths from base_path.""" 135 paginator = self.client.get_paginator("list_objects_v2") 136 pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/") 137 138 for page in pages: 139 for obj in page.get("Contents", []): 140 key = obj["Key"] 141 # Remove base path prefix to get relative path 142 rel_path = key.removeprefix(f"{self.base_path}/") 143 if rel_path: # Skip if it's just the directory itself 144 yield rel_path 145 146 def file_url( 147 self, 148 name: str, 149 request: HttpRequest | None = None, 150 use_cache: bool = True, 151 ) -> str: 152 """Generate presigned URL for file access.""" 153 use_https = CONFIG.get_bool( 154 f"storage.{self.usage.value}.{self.name}.secure_urls", 155 CONFIG.get_bool(f"storage.{self.name}.secure_urls", True), 156 ) 157 158 expires_in = int( 159 timedelta_from_string( 160 CONFIG.get( 161 f"storage.{self.usage.value}.{self.name}.url_expiry", 162 CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"), 163 ) 164 ).total_seconds() 165 ) 166 167 def _file_url(name: str, request: HttpRequest | None) -> str: 168 client = self.client 169 params = { 170 "Bucket": self.bucket_name, 171 "Key": f"{self.base_path}/{name}", 172 } 173 174 operation_name = "GetObject" 175 operation_model = client.meta.service_model.operation_model(operation_name) 176 request_dict = client._convert_to_request_dict( 177 params, 178 operation_model, 179 endpoint_url=client.meta.endpoint_url, 180 context={"is_presign_request": True}, 181 ) 182 183 # Support custom domain for S3-compatible storage (so not AWS) 184 # Well, can't you do custom domains on AWS as well? 185 custom_domain = CONFIG.get( 186 f"storage.{self.usage.value}.{self.name}.custom_domain", 187 CONFIG.get(f"storage.{self.name}.custom_domain", None), 188 ) 189 if custom_domain: 190 scheme = "https" if use_https else "http" 191 path = request_dict["url_path"] 192 193 # When using path-style addressing, the presigned URL contains the bucket 194 # name in the path (e.g., /bucket-name/key). Since custom_domain must 195 # include the bucket name (per docs), strip it from the path to avoid 196 # duplication. See: https://github.com/goauthentik/authentik/issues/19521 197 # Check with trailing slash to ensure exact bucket name match 198 if path.startswith(f"/{self.bucket_name}/"): 199 path = path.removeprefix(f"/{self.bucket_name}") 200 201 # Normalize to avoid double slashes 202 custom_domain = custom_domain.rstrip("/") 203 if not path.startswith("/"): 204 path = f"/{path}" 205 206 custom_base = urlsplit(f"{scheme}://{custom_domain}") 207 208 # Sign the final public URL instead of signing the internal S3 endpoint and 209 # rewriting it afterwards. Presigned SigV4 URLs include the host header in the 210 # canonical request, so post-sign host changes break strict backends like RustFS. 211 public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path 212 request_dict["url_path"] = public_path 213 request_dict["url"] = urlunsplit( 214 (custom_base.scheme, custom_base.netloc, public_path, "", "") 215 ) 216 217 return client._request_signer.generate_presigned_url( 218 request_dict, 219 operation_name, 220 expires_in=expires_in, 221 ) 222 223 if use_cache: 224 return self._cache_get_or_set(name, request, _file_url, expires_in) 225 else: 226 return _file_url(name, request) 227 228 def save_file(self, name: str, content: bytes) -> None: 229 """Save file to S3.""" 230 self.client.put_object( 231 Bucket=self.bucket_name, 232 Key=f"{self.base_path}/{name}", 233 Body=content, 234 ACL="private", 235 ContentType=get_content_type(name), 236 ) 237 238 @contextmanager 239 def save_file_stream(self, name: str) -> Iterator: 240 """Context manager for streaming file writes to S3.""" 241 # Keep files in memory up to 5 MB 242 with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file: 243 yield file 244 file.seek(0) 245 self.client.upload_fileobj( 246 Fileobj=file, 247 Bucket=self.bucket_name, 248 Key=f"{self.base_path}/{name}", 249 ExtraArgs={ 250 "ACL": "private", 251 "ContentType": get_content_type(name), 252 }, 253 ) 254 255 def delete_file(self, name: str) -> None: 256 """Delete file from S3.""" 257 self.client.delete_object( 258 Bucket=self.bucket_name, 259 Key=f"{self.base_path}/{name}", 260 ) 261 262 def file_exists(self, name: str) -> bool: 263 """Check if a file exists in S3.""" 264 try: 265 self.client.head_object( 266 Bucket=self.bucket_name, 267 Key=f"{self.base_path}/{name}", 268 ) 269 return True 270 except ClientError: 271 return False
S3-compatible object storage backend.
Stores files in s3-compatible storage:
- Key prefix: {usage}/{schema}/{filename}
- Supports full file management (upload, delete, list)
- Generates presigned URLs for file access
- Used when storage.backend=s3
S3Backend(*args, **kwargs)
32 def __init__(self, *args, **kwargs): 33 super().__init__(*args, **kwargs) 34 self._config = {} 35 self._session = None
Initialize backend for the given usage type.
Args: usage: FileUsage type enum value
base_path: str
49 @property 50 def base_path(self) -> str: 51 """S3 key prefix: {usage}/{schema}/""" 52 return f"{self.usage.value}/{connection.schema_name}"
S3 key prefix: {usage}/{schema}/
session: boto3.session.Session
61 @property 62 def session(self) -> boto3.Session: 63 """Create boto3 session with configured credentials.""" 64 session_profile, session_profile_r = self._get_config("session_profile", None) 65 if session_profile is not None: 66 if session_profile_r or self._session is None: 67 self._session = boto3.Session(profile_name=session_profile) 68 return self._session 69 else: 70 return self._session 71 else: 72 access_key, access_key_r = self._get_config("access_key", None) 73 secret_key, secret_key_r = self._get_config("secret_key", None) 74 session_token, session_token_r = self._get_config("session_token", None) 75 if access_key_r or secret_key_r or session_token_r or self._session is None: 76 self._session = boto3.Session( 77 aws_access_key_id=access_key, 78 aws_secret_access_key=secret_key, 79 aws_session_token=session_token, 80 ) 81 return self._session 82 else: 83 return self._session
Create boto3 session with configured credentials.
client
85 @property 86 def client(self): 87 """Create S3 client with configured endpoint and region.""" 88 endpoint_url = CONFIG.get( 89 f"storage.{self.usage.value}.{self.name}.endpoint", 90 CONFIG.get(f"storage.{self.name}.endpoint", None), 91 ) 92 use_ssl = CONFIG.get( 93 f"storage.{self.usage.value}.{self.name}.use_ssl", 94 CONFIG.get(f"storage.{self.name}.use_ssl", True), 95 ) 96 region_name = CONFIG.get( 97 f"storage.{self.usage.value}.{self.name}.region", 98 CONFIG.get(f"storage.{self.name}.region", None), 99 ) 100 addressing_style = CONFIG.get( 101 f"storage.{self.usage.value}.{self.name}.addressing_style", 102 CONFIG.get(f"storage.{self.name}.addressing_style", "auto"), 103 ) 104 signature_version = CONFIG.get( 105 f"storage.{self.usage.value}.{self.name}.signature_version", 106 CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"), 107 ) 108 # Keep signature_version pass-through and let boto3/botocore handle it. 109 # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3` 110 # are the documented values: 111 # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407 112 # Botocore also supports additional signer names, so we intentionally do 113 # not enforce a restricted allowlist here. 114 115 return self.session.client( 116 "s3", 117 endpoint_url=endpoint_url, 118 use_ssl=use_ssl, 119 region_name=region_name, 120 config=Config( 121 signature_version=signature_version, s3={"addressing_style": addressing_style} 122 ), 123 )
Create S3 client with configured endpoint and region.
manageable: bool
Whether this backend can actually be used for management.
Used only for management check, not for created the backend
def
list_files(self) -> Generator[str]:
133 def list_files(self) -> Generator[str]: 134 """List all files returning relative paths from base_path.""" 135 paginator = self.client.get_paginator("list_objects_v2") 136 pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/") 137 138 for page in pages: 139 for obj in page.get("Contents", []): 140 key = obj["Key"] 141 # Remove base path prefix to get relative path 142 rel_path = key.removeprefix(f"{self.base_path}/") 143 if rel_path: # Skip if it's just the directory itself 144 yield rel_path
List all files returning relative paths from base_path.
def
file_url( self, name: str, request: django.http.request.HttpRequest | None = None, use_cache: bool = True) -> str:
146 def file_url( 147 self, 148 name: str, 149 request: HttpRequest | None = None, 150 use_cache: bool = True, 151 ) -> str: 152 """Generate presigned URL for file access.""" 153 use_https = CONFIG.get_bool( 154 f"storage.{self.usage.value}.{self.name}.secure_urls", 155 CONFIG.get_bool(f"storage.{self.name}.secure_urls", True), 156 ) 157 158 expires_in = int( 159 timedelta_from_string( 160 CONFIG.get( 161 f"storage.{self.usage.value}.{self.name}.url_expiry", 162 CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"), 163 ) 164 ).total_seconds() 165 ) 166 167 def _file_url(name: str, request: HttpRequest | None) -> str: 168 client = self.client 169 params = { 170 "Bucket": self.bucket_name, 171 "Key": f"{self.base_path}/{name}", 172 } 173 174 operation_name = "GetObject" 175 operation_model = client.meta.service_model.operation_model(operation_name) 176 request_dict = client._convert_to_request_dict( 177 params, 178 operation_model, 179 endpoint_url=client.meta.endpoint_url, 180 context={"is_presign_request": True}, 181 ) 182 183 # Support custom domain for S3-compatible storage (so not AWS) 184 # Well, can't you do custom domains on AWS as well? 185 custom_domain = CONFIG.get( 186 f"storage.{self.usage.value}.{self.name}.custom_domain", 187 CONFIG.get(f"storage.{self.name}.custom_domain", None), 188 ) 189 if custom_domain: 190 scheme = "https" if use_https else "http" 191 path = request_dict["url_path"] 192 193 # When using path-style addressing, the presigned URL contains the bucket 194 # name in the path (e.g., /bucket-name/key). Since custom_domain must 195 # include the bucket name (per docs), strip it from the path to avoid 196 # duplication. See: https://github.com/goauthentik/authentik/issues/19521 197 # Check with trailing slash to ensure exact bucket name match 198 if path.startswith(f"/{self.bucket_name}/"): 199 path = path.removeprefix(f"/{self.bucket_name}") 200 201 # Normalize to avoid double slashes 202 custom_domain = custom_domain.rstrip("/") 203 if not path.startswith("/"): 204 path = f"/{path}" 205 206 custom_base = urlsplit(f"{scheme}://{custom_domain}") 207 208 # Sign the final public URL instead of signing the internal S3 endpoint and 209 # rewriting it afterwards. Presigned SigV4 URLs include the host header in the 210 # canonical request, so post-sign host changes break strict backends like RustFS. 211 public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path 212 request_dict["url_path"] = public_path 213 request_dict["url"] = urlunsplit( 214 (custom_base.scheme, custom_base.netloc, public_path, "", "") 215 ) 216 217 return client._request_signer.generate_presigned_url( 218 request_dict, 219 operation_name, 220 expires_in=expires_in, 221 ) 222 223 if use_cache: 224 return self._cache_get_or_set(name, request, _file_url, expires_in) 225 else: 226 return _file_url(name, request)
Generate presigned URL for file access.
def
save_file(self, name: str, content: bytes) -> None:
228 def save_file(self, name: str, content: bytes) -> None: 229 """Save file to S3.""" 230 self.client.put_object( 231 Bucket=self.bucket_name, 232 Key=f"{self.base_path}/{name}", 233 Body=content, 234 ACL="private", 235 ContentType=get_content_type(name), 236 )
Save file to S3.
@contextmanager
def
save_file_stream(self, name: str) -> Iterator:
238 @contextmanager 239 def save_file_stream(self, name: str) -> Iterator: 240 """Context manager for streaming file writes to S3.""" 241 # Keep files in memory up to 5 MB 242 with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file: 243 yield file 244 file.seek(0) 245 self.client.upload_fileobj( 246 Fileobj=file, 247 Bucket=self.bucket_name, 248 Key=f"{self.base_path}/{name}", 249 ExtraArgs={ 250 "ACL": "private", 251 "ContentType": get_content_type(name), 252 }, 253 )
Context manager for streaming file writes to S3.
def
delete_file(self, name: str) -> None:
255 def delete_file(self, name: str) -> None: 256 """Delete file from S3.""" 257 self.client.delete_object( 258 Bucket=self.bucket_name, 259 Key=f"{self.base_path}/{name}", 260 )
Delete file from S3.
def
file_exists(self, name: str) -> bool:
262 def file_exists(self, name: str) -> bool: 263 """Check if a file exists in S3.""" 264 try: 265 self.client.head_object( 266 Bucket=self.bucket_name, 267 Key=f"{self.base_path}/{name}", 268 ) 269 return True 270 except ClientError: 271 return False
Check if a file exists in S3.