authentik.admin.files.backends.s3
1from collections.abc import Generator, Iterator 2from contextlib import contextmanager 3from tempfile import SpooledTemporaryFile 4from urllib.parse import urlsplit 5 6import boto3 7from botocore.config import Config 8from botocore.exceptions import ClientError 9from django.db import connection 10from django.http.request import HttpRequest 11 12from authentik.admin.files.backends.base import ManageableBackend, get_content_type 13from authentik.admin.files.usage import FileUsage 14from authentik.lib.config import CONFIG 15from authentik.lib.utils.time import timedelta_from_string 16 17 18class S3Backend(ManageableBackend): 19 """S3-compatible object storage backend. 20 21 Stores files in s3-compatible storage: 22 - Key prefix: {usage}/{schema}/{filename} 23 - Supports full file management (upload, delete, list) 24 - Generates presigned URLs for file access 25 - Used when storage.backend=s3 26 """ 27 28 allowed_usages = list(FileUsage) # All usages 29 name = "s3" 30 31 def __init__(self, *args, **kwargs): 32 super().__init__(*args, **kwargs) 33 self._config = {} 34 self._session = None 35 36 def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]: 37 unset = object() 38 current = self._config.get(key, unset) 39 refreshed = CONFIG.refresh( 40 f"storage.{self.usage.value}.{self.name}.{key}", 41 CONFIG.refresh(f"storage.{self.name}.{key}", default), 42 ) 43 if current is unset: 44 current = refreshed 45 self._config[key] = refreshed 46 return (refreshed, current != refreshed) 47 48 @property 49 def base_path(self) -> str: 50 """S3 key prefix: {usage}/{schema}/""" 51 return f"{self.usage.value}/{connection.schema_name}" 52 53 @property 54 def bucket_name(self) -> str: 55 return CONFIG.get( 56 f"storage.{self.usage.value}.{self.name}.bucket_name", 57 CONFIG.get(f"storage.{self.name}.bucket_name"), 58 ) 59 60 @property 61 def session(self) -> boto3.Session: 62 """Create boto3 session with configured credentials.""" 63 session_profile, session_profile_r = self._get_config("session_profile", None) 64 if session_profile is not None: 65 if session_profile_r or self._session is None: 66 self._session = boto3.Session(profile_name=session_profile) 67 return self._session 68 else: 69 return self._session 70 else: 71 access_key, access_key_r = self._get_config("access_key", None) 72 secret_key, secret_key_r = self._get_config("secret_key", None) 73 session_token, session_token_r = self._get_config("session_token", None) 74 if access_key_r or secret_key_r or session_token_r or self._session is None: 75 self._session = boto3.Session( 76 aws_access_key_id=access_key, 77 aws_secret_access_key=secret_key, 78 aws_session_token=session_token, 79 ) 80 return self._session 81 else: 82 return self._session 83 84 @property 85 def client(self): 86 """Create S3 client with configured endpoint and region.""" 87 endpoint_url = CONFIG.get( 88 f"storage.{self.usage.value}.{self.name}.endpoint", 89 CONFIG.get(f"storage.{self.name}.endpoint", None), 90 ) 91 use_ssl = CONFIG.get( 92 f"storage.{self.usage.value}.{self.name}.use_ssl", 93 CONFIG.get(f"storage.{self.name}.use_ssl", True), 94 ) 95 region_name = CONFIG.get( 96 f"storage.{self.usage.value}.{self.name}.region", 97 CONFIG.get(f"storage.{self.name}.region", None), 98 ) 99 addressing_style = CONFIG.get( 100 f"storage.{self.usage.value}.{self.name}.addressing_style", 101 CONFIG.get(f"storage.{self.name}.addressing_style", "auto"), 102 ) 103 signature_version = CONFIG.get( 104 f"storage.{self.usage.value}.{self.name}.signature_version", 105 CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"), 106 ) 107 # Keep signature_version pass-through and let boto3/botocore handle it. 108 # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3` 109 # are the documented values: 110 # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407 111 # Botocore also supports additional signer names, so we intentionally do 112 # not enforce a restricted allowlist here. 113 114 return self.session.client( 115 "s3", 116 endpoint_url=endpoint_url, 117 use_ssl=use_ssl, 118 region_name=region_name, 119 config=Config( 120 signature_version=signature_version, s3={"addressing_style": addressing_style} 121 ), 122 ) 123 124 @property 125 def manageable(self) -> bool: 126 return True 127 128 def supports_file(self, name: str) -> bool: 129 """We support all files""" 130 return True 131 132 def list_files(self) -> Generator[str]: 133 """List all files returning relative paths from base_path.""" 134 paginator = self.client.get_paginator("list_objects_v2") 135 pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/") 136 137 for page in pages: 138 for obj in page.get("Contents", []): 139 key = obj["Key"] 140 # Remove base path prefix to get relative path 141 rel_path = key.removeprefix(f"{self.base_path}/") 142 if rel_path: # Skip if it's just the directory itself 143 yield rel_path 144 145 def file_url( 146 self, 147 name: str, 148 request: HttpRequest | None = None, 149 use_cache: bool = True, 150 ) -> str: 151 """Generate presigned URL for file access.""" 152 use_https = CONFIG.get_bool( 153 f"storage.{self.usage.value}.{self.name}.secure_urls", 154 CONFIG.get_bool(f"storage.{self.name}.secure_urls", True), 155 ) 156 157 expires_in = int( 158 timedelta_from_string( 159 CONFIG.get( 160 f"storage.{self.usage.value}.{self.name}.url_expiry", 161 CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"), 162 ) 163 ).total_seconds() 164 ) 165 166 def _file_url(name: str, request: HttpRequest | None) -> str: 167 params = { 168 "Bucket": self.bucket_name, 169 "Key": f"{self.base_path}/{name}", 170 } 171 172 url = self.client.generate_presigned_url( 173 "get_object", 174 Params=params, 175 ExpiresIn=expires_in, 176 HttpMethod="GET", 177 ) 178 179 # Support custom domain for S3-compatible storage (so not AWS) 180 # Well, can't you do custom domains on AWS as well? 181 custom_domain = CONFIG.get( 182 f"storage.{self.usage.value}.{self.name}.custom_domain", 183 CONFIG.get(f"storage.{self.name}.custom_domain", None), 184 ) 185 if custom_domain: 186 parsed = urlsplit(url) 187 scheme = "https" if use_https else "http" 188 path = parsed.path 189 190 # When using path-style addressing, the presigned URL contains the bucket 191 # name in the path (e.g., /bucket-name/key). Since custom_domain must 192 # include the bucket name (per docs), strip it from the path to avoid 193 # duplication. See: https://github.com/goauthentik/authentik/issues/19521 194 # Check with trailing slash to ensure exact bucket name match 195 if path.startswith(f"/{self.bucket_name}/"): 196 path = path.removeprefix(f"/{self.bucket_name}") 197 198 # Normalize to avoid double slashes 199 custom_domain = custom_domain.rstrip("/") 200 if not path.startswith("/"): 201 path = f"/{path}" 202 203 url = f"{scheme}://{custom_domain}{path}?{parsed.query}" 204 205 return url 206 207 if use_cache: 208 return self._cache_get_or_set(name, request, _file_url, expires_in) 209 else: 210 return _file_url(name, request) 211 212 def save_file(self, name: str, content: bytes) -> None: 213 """Save file to S3.""" 214 self.client.put_object( 215 Bucket=self.bucket_name, 216 Key=f"{self.base_path}/{name}", 217 Body=content, 218 ACL="private", 219 ContentType=get_content_type(name), 220 ) 221 222 @contextmanager 223 def save_file_stream(self, name: str) -> Iterator: 224 """Context manager for streaming file writes to S3.""" 225 # Keep files in memory up to 5 MB 226 with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file: 227 yield file 228 file.seek(0) 229 self.client.upload_fileobj( 230 Fileobj=file, 231 Bucket=self.bucket_name, 232 Key=f"{self.base_path}/{name}", 233 ExtraArgs={ 234 "ACL": "private", 235 "ContentType": get_content_type(name), 236 }, 237 ) 238 239 def delete_file(self, name: str) -> None: 240 """Delete file from S3.""" 241 self.client.delete_object( 242 Bucket=self.bucket_name, 243 Key=f"{self.base_path}/{name}", 244 ) 245 246 def file_exists(self, name: str) -> bool: 247 """Check if a file exists in S3.""" 248 try: 249 self.client.head_object( 250 Bucket=self.bucket_name, 251 Key=f"{self.base_path}/{name}", 252 ) 253 return True 254 except ClientError: 255 return False
19class S3Backend(ManageableBackend): 20 """S3-compatible object storage backend. 21 22 Stores files in s3-compatible storage: 23 - Key prefix: {usage}/{schema}/{filename} 24 - Supports full file management (upload, delete, list) 25 - Generates presigned URLs for file access 26 - Used when storage.backend=s3 27 """ 28 29 allowed_usages = list(FileUsage) # All usages 30 name = "s3" 31 32 def __init__(self, *args, **kwargs): 33 super().__init__(*args, **kwargs) 34 self._config = {} 35 self._session = None 36 37 def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]: 38 unset = object() 39 current = self._config.get(key, unset) 40 refreshed = CONFIG.refresh( 41 f"storage.{self.usage.value}.{self.name}.{key}", 42 CONFIG.refresh(f"storage.{self.name}.{key}", default), 43 ) 44 if current is unset: 45 current = refreshed 46 self._config[key] = refreshed 47 return (refreshed, current != refreshed) 48 49 @property 50 def base_path(self) -> str: 51 """S3 key prefix: {usage}/{schema}/""" 52 return f"{self.usage.value}/{connection.schema_name}" 53 54 @property 55 def bucket_name(self) -> str: 56 return CONFIG.get( 57 f"storage.{self.usage.value}.{self.name}.bucket_name", 58 CONFIG.get(f"storage.{self.name}.bucket_name"), 59 ) 60 61 @property 62 def session(self) -> boto3.Session: 63 """Create boto3 session with configured credentials.""" 64 session_profile, session_profile_r = self._get_config("session_profile", None) 65 if session_profile is not None: 66 if session_profile_r or self._session is None: 67 self._session = boto3.Session(profile_name=session_profile) 68 return self._session 69 else: 70 return self._session 71 else: 72 access_key, access_key_r = self._get_config("access_key", None) 73 secret_key, secret_key_r = self._get_config("secret_key", None) 74 session_token, session_token_r = self._get_config("session_token", None) 75 if access_key_r or secret_key_r or session_token_r or self._session is None: 76 self._session = boto3.Session( 77 aws_access_key_id=access_key, 78 aws_secret_access_key=secret_key, 79 aws_session_token=session_token, 80 ) 81 return self._session 82 else: 83 return self._session 84 85 @property 86 def client(self): 87 """Create S3 client with configured endpoint and region.""" 88 endpoint_url = CONFIG.get( 89 f"storage.{self.usage.value}.{self.name}.endpoint", 90 CONFIG.get(f"storage.{self.name}.endpoint", None), 91 ) 92 use_ssl = CONFIG.get( 93 f"storage.{self.usage.value}.{self.name}.use_ssl", 94 CONFIG.get(f"storage.{self.name}.use_ssl", True), 95 ) 96 region_name = CONFIG.get( 97 f"storage.{self.usage.value}.{self.name}.region", 98 CONFIG.get(f"storage.{self.name}.region", None), 99 ) 100 addressing_style = CONFIG.get( 101 f"storage.{self.usage.value}.{self.name}.addressing_style", 102 CONFIG.get(f"storage.{self.name}.addressing_style", "auto"), 103 ) 104 signature_version = CONFIG.get( 105 f"storage.{self.usage.value}.{self.name}.signature_version", 106 CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"), 107 ) 108 # Keep signature_version pass-through and let boto3/botocore handle it. 109 # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3` 110 # are the documented values: 111 # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407 112 # Botocore also supports additional signer names, so we intentionally do 113 # not enforce a restricted allowlist here. 114 115 return self.session.client( 116 "s3", 117 endpoint_url=endpoint_url, 118 use_ssl=use_ssl, 119 region_name=region_name, 120 config=Config( 121 signature_version=signature_version, s3={"addressing_style": addressing_style} 122 ), 123 ) 124 125 @property 126 def manageable(self) -> bool: 127 return True 128 129 def supports_file(self, name: str) -> bool: 130 """We support all files""" 131 return True 132 133 def list_files(self) -> Generator[str]: 134 """List all files returning relative paths from base_path.""" 135 paginator = self.client.get_paginator("list_objects_v2") 136 pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/") 137 138 for page in pages: 139 for obj in page.get("Contents", []): 140 key = obj["Key"] 141 # Remove base path prefix to get relative path 142 rel_path = key.removeprefix(f"{self.base_path}/") 143 if rel_path: # Skip if it's just the directory itself 144 yield rel_path 145 146 def file_url( 147 self, 148 name: str, 149 request: HttpRequest | None = None, 150 use_cache: bool = True, 151 ) -> str: 152 """Generate presigned URL for file access.""" 153 use_https = CONFIG.get_bool( 154 f"storage.{self.usage.value}.{self.name}.secure_urls", 155 CONFIG.get_bool(f"storage.{self.name}.secure_urls", True), 156 ) 157 158 expires_in = int( 159 timedelta_from_string( 160 CONFIG.get( 161 f"storage.{self.usage.value}.{self.name}.url_expiry", 162 CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"), 163 ) 164 ).total_seconds() 165 ) 166 167 def _file_url(name: str, request: HttpRequest | None) -> str: 168 params = { 169 "Bucket": self.bucket_name, 170 "Key": f"{self.base_path}/{name}", 171 } 172 173 url = self.client.generate_presigned_url( 174 "get_object", 175 Params=params, 176 ExpiresIn=expires_in, 177 HttpMethod="GET", 178 ) 179 180 # Support custom domain for S3-compatible storage (so not AWS) 181 # Well, can't you do custom domains on AWS as well? 182 custom_domain = CONFIG.get( 183 f"storage.{self.usage.value}.{self.name}.custom_domain", 184 CONFIG.get(f"storage.{self.name}.custom_domain", None), 185 ) 186 if custom_domain: 187 parsed = urlsplit(url) 188 scheme = "https" if use_https else "http" 189 path = parsed.path 190 191 # When using path-style addressing, the presigned URL contains the bucket 192 # name in the path (e.g., /bucket-name/key). Since custom_domain must 193 # include the bucket name (per docs), strip it from the path to avoid 194 # duplication. See: https://github.com/goauthentik/authentik/issues/19521 195 # Check with trailing slash to ensure exact bucket name match 196 if path.startswith(f"/{self.bucket_name}/"): 197 path = path.removeprefix(f"/{self.bucket_name}") 198 199 # Normalize to avoid double slashes 200 custom_domain = custom_domain.rstrip("/") 201 if not path.startswith("/"): 202 path = f"/{path}" 203 204 url = f"{scheme}://{custom_domain}{path}?{parsed.query}" 205 206 return url 207 208 if use_cache: 209 return self._cache_get_or_set(name, request, _file_url, expires_in) 210 else: 211 return _file_url(name, request) 212 213 def save_file(self, name: str, content: bytes) -> None: 214 """Save file to S3.""" 215 self.client.put_object( 216 Bucket=self.bucket_name, 217 Key=f"{self.base_path}/{name}", 218 Body=content, 219 ACL="private", 220 ContentType=get_content_type(name), 221 ) 222 223 @contextmanager 224 def save_file_stream(self, name: str) -> Iterator: 225 """Context manager for streaming file writes to S3.""" 226 # Keep files in memory up to 5 MB 227 with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file: 228 yield file 229 file.seek(0) 230 self.client.upload_fileobj( 231 Fileobj=file, 232 Bucket=self.bucket_name, 233 Key=f"{self.base_path}/{name}", 234 ExtraArgs={ 235 "ACL": "private", 236 "ContentType": get_content_type(name), 237 }, 238 ) 239 240 def delete_file(self, name: str) -> None: 241 """Delete file from S3.""" 242 self.client.delete_object( 243 Bucket=self.bucket_name, 244 Key=f"{self.base_path}/{name}", 245 ) 246 247 def file_exists(self, name: str) -> bool: 248 """Check if a file exists in S3.""" 249 try: 250 self.client.head_object( 251 Bucket=self.bucket_name, 252 Key=f"{self.base_path}/{name}", 253 ) 254 return True 255 except ClientError: 256 return False
S3-compatible object storage backend.
Stores files in s3-compatible storage:
- Key prefix: {usage}/{schema}/{filename}
- Supports full file management (upload, delete, list)
- Generates presigned URLs for file access
- Used when storage.backend=s3
S3Backend(*args, **kwargs)
32 def __init__(self, *args, **kwargs): 33 super().__init__(*args, **kwargs) 34 self._config = {} 35 self._session = None
Initialize backend for the given usage type.
Args: usage: FileUsage type enum value
base_path: str
49 @property 50 def base_path(self) -> str: 51 """S3 key prefix: {usage}/{schema}/""" 52 return f"{self.usage.value}/{connection.schema_name}"
S3 key prefix: {usage}/{schema}/
session: boto3.session.Session
61 @property 62 def session(self) -> boto3.Session: 63 """Create boto3 session with configured credentials.""" 64 session_profile, session_profile_r = self._get_config("session_profile", None) 65 if session_profile is not None: 66 if session_profile_r or self._session is None: 67 self._session = boto3.Session(profile_name=session_profile) 68 return self._session 69 else: 70 return self._session 71 else: 72 access_key, access_key_r = self._get_config("access_key", None) 73 secret_key, secret_key_r = self._get_config("secret_key", None) 74 session_token, session_token_r = self._get_config("session_token", None) 75 if access_key_r or secret_key_r or session_token_r or self._session is None: 76 self._session = boto3.Session( 77 aws_access_key_id=access_key, 78 aws_secret_access_key=secret_key, 79 aws_session_token=session_token, 80 ) 81 return self._session 82 else: 83 return self._session
Create boto3 session with configured credentials.
client
85 @property 86 def client(self): 87 """Create S3 client with configured endpoint and region.""" 88 endpoint_url = CONFIG.get( 89 f"storage.{self.usage.value}.{self.name}.endpoint", 90 CONFIG.get(f"storage.{self.name}.endpoint", None), 91 ) 92 use_ssl = CONFIG.get( 93 f"storage.{self.usage.value}.{self.name}.use_ssl", 94 CONFIG.get(f"storage.{self.name}.use_ssl", True), 95 ) 96 region_name = CONFIG.get( 97 f"storage.{self.usage.value}.{self.name}.region", 98 CONFIG.get(f"storage.{self.name}.region", None), 99 ) 100 addressing_style = CONFIG.get( 101 f"storage.{self.usage.value}.{self.name}.addressing_style", 102 CONFIG.get(f"storage.{self.name}.addressing_style", "auto"), 103 ) 104 signature_version = CONFIG.get( 105 f"storage.{self.usage.value}.{self.name}.signature_version", 106 CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"), 107 ) 108 # Keep signature_version pass-through and let boto3/botocore handle it. 109 # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3` 110 # are the documented values: 111 # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407 112 # Botocore also supports additional signer names, so we intentionally do 113 # not enforce a restricted allowlist here. 114 115 return self.session.client( 116 "s3", 117 endpoint_url=endpoint_url, 118 use_ssl=use_ssl, 119 region_name=region_name, 120 config=Config( 121 signature_version=signature_version, s3={"addressing_style": addressing_style} 122 ), 123 )
Create S3 client with configured endpoint and region.
manageable: bool
Whether this backend can actually be used for management.
Used only for management check, not for created the backend
def
list_files(self) -> Generator[str]:
133 def list_files(self) -> Generator[str]: 134 """List all files returning relative paths from base_path.""" 135 paginator = self.client.get_paginator("list_objects_v2") 136 pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/") 137 138 for page in pages: 139 for obj in page.get("Contents", []): 140 key = obj["Key"] 141 # Remove base path prefix to get relative path 142 rel_path = key.removeprefix(f"{self.base_path}/") 143 if rel_path: # Skip if it's just the directory itself 144 yield rel_path
List all files returning relative paths from base_path.
def
file_url( self, name: str, request: django.http.request.HttpRequest | None = None, use_cache: bool = True) -> str:
146 def file_url( 147 self, 148 name: str, 149 request: HttpRequest | None = None, 150 use_cache: bool = True, 151 ) -> str: 152 """Generate presigned URL for file access.""" 153 use_https = CONFIG.get_bool( 154 f"storage.{self.usage.value}.{self.name}.secure_urls", 155 CONFIG.get_bool(f"storage.{self.name}.secure_urls", True), 156 ) 157 158 expires_in = int( 159 timedelta_from_string( 160 CONFIG.get( 161 f"storage.{self.usage.value}.{self.name}.url_expiry", 162 CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"), 163 ) 164 ).total_seconds() 165 ) 166 167 def _file_url(name: str, request: HttpRequest | None) -> str: 168 params = { 169 "Bucket": self.bucket_name, 170 "Key": f"{self.base_path}/{name}", 171 } 172 173 url = self.client.generate_presigned_url( 174 "get_object", 175 Params=params, 176 ExpiresIn=expires_in, 177 HttpMethod="GET", 178 ) 179 180 # Support custom domain for S3-compatible storage (so not AWS) 181 # Well, can't you do custom domains on AWS as well? 182 custom_domain = CONFIG.get( 183 f"storage.{self.usage.value}.{self.name}.custom_domain", 184 CONFIG.get(f"storage.{self.name}.custom_domain", None), 185 ) 186 if custom_domain: 187 parsed = urlsplit(url) 188 scheme = "https" if use_https else "http" 189 path = parsed.path 190 191 # When using path-style addressing, the presigned URL contains the bucket 192 # name in the path (e.g., /bucket-name/key). Since custom_domain must 193 # include the bucket name (per docs), strip it from the path to avoid 194 # duplication. See: https://github.com/goauthentik/authentik/issues/19521 195 # Check with trailing slash to ensure exact bucket name match 196 if path.startswith(f"/{self.bucket_name}/"): 197 path = path.removeprefix(f"/{self.bucket_name}") 198 199 # Normalize to avoid double slashes 200 custom_domain = custom_domain.rstrip("/") 201 if not path.startswith("/"): 202 path = f"/{path}" 203 204 url = f"{scheme}://{custom_domain}{path}?{parsed.query}" 205 206 return url 207 208 if use_cache: 209 return self._cache_get_or_set(name, request, _file_url, expires_in) 210 else: 211 return _file_url(name, request)
Generate presigned URL for file access.
def
save_file(self, name: str, content: bytes) -> None:
213 def save_file(self, name: str, content: bytes) -> None: 214 """Save file to S3.""" 215 self.client.put_object( 216 Bucket=self.bucket_name, 217 Key=f"{self.base_path}/{name}", 218 Body=content, 219 ACL="private", 220 ContentType=get_content_type(name), 221 )
Save file to S3.
@contextmanager
def
save_file_stream(self, name: str) -> Iterator:
223 @contextmanager 224 def save_file_stream(self, name: str) -> Iterator: 225 """Context manager for streaming file writes to S3.""" 226 # Keep files in memory up to 5 MB 227 with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file: 228 yield file 229 file.seek(0) 230 self.client.upload_fileobj( 231 Fileobj=file, 232 Bucket=self.bucket_name, 233 Key=f"{self.base_path}/{name}", 234 ExtraArgs={ 235 "ACL": "private", 236 "ContentType": get_content_type(name), 237 }, 238 )
Context manager for streaming file writes to S3.
def
delete_file(self, name: str) -> None:
240 def delete_file(self, name: str) -> None: 241 """Delete file from S3.""" 242 self.client.delete_object( 243 Bucket=self.bucket_name, 244 Key=f"{self.base_path}/{name}", 245 )
Delete file from S3.
def
file_exists(self, name: str) -> bool:
247 def file_exists(self, name: str) -> bool: 248 """Check if a file exists in S3.""" 249 try: 250 self.client.head_object( 251 Bucket=self.bucket_name, 252 Key=f"{self.base_path}/{name}", 253 ) 254 return True 255 except ClientError: 256 return False
Check if a file exists in S3.