authentik.admin.files.backends.s3

  1from collections.abc import Generator, Iterator
  2from contextlib import contextmanager
  3from tempfile import SpooledTemporaryFile
  4from urllib.parse import urlsplit
  5
  6import boto3
  7from botocore.config import Config
  8from botocore.exceptions import ClientError
  9from django.db import connection
 10from django.http.request import HttpRequest
 11
 12from authentik.admin.files.backends.base import ManageableBackend, get_content_type
 13from authentik.admin.files.usage import FileUsage
 14from authentik.lib.config import CONFIG
 15from authentik.lib.utils.time import timedelta_from_string
 16
 17
 18class S3Backend(ManageableBackend):
 19    """S3-compatible object storage backend.
 20
 21    Stores files in s3-compatible storage:
 22    - Key prefix: {usage}/{schema}/{filename}
 23    - Supports full file management (upload, delete, list)
 24    - Generates presigned URLs for file access
 25    - Used when storage.backend=s3
 26    """
 27
 28    allowed_usages = list(FileUsage)  # All usages
 29    name = "s3"
 30
 31    def __init__(self, *args, **kwargs):
 32        super().__init__(*args, **kwargs)
 33        self._config = {}
 34        self._session = None
 35
 36    def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]:
 37        unset = object()
 38        current = self._config.get(key, unset)
 39        refreshed = CONFIG.refresh(
 40            f"storage.{self.usage.value}.{self.name}.{key}",
 41            CONFIG.refresh(f"storage.{self.name}.{key}", default),
 42        )
 43        if current is unset:
 44            current = refreshed
 45        self._config[key] = refreshed
 46        return (refreshed, current != refreshed)
 47
 48    @property
 49    def base_path(self) -> str:
 50        """S3 key prefix: {usage}/{schema}/"""
 51        return f"{self.usage.value}/{connection.schema_name}"
 52
 53    @property
 54    def bucket_name(self) -> str:
 55        return CONFIG.get(
 56            f"storage.{self.usage.value}.{self.name}.bucket_name",
 57            CONFIG.get(f"storage.{self.name}.bucket_name"),
 58        )
 59
 60    @property
 61    def session(self) -> boto3.Session:
 62        """Create boto3 session with configured credentials."""
 63        session_profile, session_profile_r = self._get_config("session_profile", None)
 64        if session_profile is not None:
 65            if session_profile_r or self._session is None:
 66                self._session = boto3.Session(profile_name=session_profile)
 67                return self._session
 68            else:
 69                return self._session
 70        else:
 71            access_key, access_key_r = self._get_config("access_key", None)
 72            secret_key, secret_key_r = self._get_config("secret_key", None)
 73            session_token, session_token_r = self._get_config("session_token", None)
 74            if access_key_r or secret_key_r or session_token_r or self._session is None:
 75                self._session = boto3.Session(
 76                    aws_access_key_id=access_key,
 77                    aws_secret_access_key=secret_key,
 78                    aws_session_token=session_token,
 79                )
 80                return self._session
 81            else:
 82                return self._session
 83
 84    @property
 85    def client(self):
 86        """Create S3 client with configured endpoint and region."""
 87        endpoint_url = CONFIG.get(
 88            f"storage.{self.usage.value}.{self.name}.endpoint",
 89            CONFIG.get(f"storage.{self.name}.endpoint", None),
 90        )
 91        use_ssl = CONFIG.get(
 92            f"storage.{self.usage.value}.{self.name}.use_ssl",
 93            CONFIG.get(f"storage.{self.name}.use_ssl", True),
 94        )
 95        region_name = CONFIG.get(
 96            f"storage.{self.usage.value}.{self.name}.region",
 97            CONFIG.get(f"storage.{self.name}.region", None),
 98        )
 99        addressing_style = CONFIG.get(
100            f"storage.{self.usage.value}.{self.name}.addressing_style",
101            CONFIG.get(f"storage.{self.name}.addressing_style", "auto"),
102        )
103        signature_version = CONFIG.get(
104            f"storage.{self.usage.value}.{self.name}.signature_version",
105            CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"),
106        )
107        # Keep signature_version pass-through and let boto3/botocore handle it.
108        # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3`
109        # are the documented values:
110        # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407
111        # Botocore also supports additional signer names, so we intentionally do
112        # not enforce a restricted allowlist here.
113
114        return self.session.client(
115            "s3",
116            endpoint_url=endpoint_url,
117            use_ssl=use_ssl,
118            region_name=region_name,
119            config=Config(
120                signature_version=signature_version, s3={"addressing_style": addressing_style}
121            ),
122        )
123
124    @property
125    def manageable(self) -> bool:
126        return True
127
128    def supports_file(self, name: str) -> bool:
129        """We support all files"""
130        return True
131
132    def list_files(self) -> Generator[str]:
133        """List all files returning relative paths from base_path."""
134        paginator = self.client.get_paginator("list_objects_v2")
135        pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/")
136
137        for page in pages:
138            for obj in page.get("Contents", []):
139                key = obj["Key"]
140                # Remove base path prefix to get relative path
141                rel_path = key.removeprefix(f"{self.base_path}/")
142                if rel_path:  # Skip if it's just the directory itself
143                    yield rel_path
144
145    def file_url(
146        self,
147        name: str,
148        request: HttpRequest | None = None,
149        use_cache: bool = True,
150    ) -> str:
151        """Generate presigned URL for file access."""
152        use_https = CONFIG.get_bool(
153            f"storage.{self.usage.value}.{self.name}.secure_urls",
154            CONFIG.get_bool(f"storage.{self.name}.secure_urls", True),
155        )
156
157        expires_in = int(
158            timedelta_from_string(
159                CONFIG.get(
160                    f"storage.{self.usage.value}.{self.name}.url_expiry",
161                    CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
162                )
163            ).total_seconds()
164        )
165
166        def _file_url(name: str, request: HttpRequest | None) -> str:
167            params = {
168                "Bucket": self.bucket_name,
169                "Key": f"{self.base_path}/{name}",
170            }
171
172            url = self.client.generate_presigned_url(
173                "get_object",
174                Params=params,
175                ExpiresIn=expires_in,
176                HttpMethod="GET",
177            )
178
179            # Support custom domain for S3-compatible storage (so not AWS)
180            # Well, can't you do custom domains on AWS as well?
181            custom_domain = CONFIG.get(
182                f"storage.{self.usage.value}.{self.name}.custom_domain",
183                CONFIG.get(f"storage.{self.name}.custom_domain", None),
184            )
185            if custom_domain:
186                parsed = urlsplit(url)
187                scheme = "https" if use_https else "http"
188                path = parsed.path
189
190                # When using path-style addressing, the presigned URL contains the bucket
191                # name in the path (e.g., /bucket-name/key). Since custom_domain must
192                # include the bucket name (per docs), strip it from the path to avoid
193                # duplication. See: https://github.com/goauthentik/authentik/issues/19521
194                # Check with trailing slash to ensure exact bucket name match
195                if path.startswith(f"/{self.bucket_name}/"):
196                    path = path.removeprefix(f"/{self.bucket_name}")
197
198                # Normalize to avoid double slashes
199                custom_domain = custom_domain.rstrip("/")
200                if not path.startswith("/"):
201                    path = f"/{path}"
202
203                url = f"{scheme}://{custom_domain}{path}?{parsed.query}"
204
205            return url
206
207        if use_cache:
208            return self._cache_get_or_set(name, request, _file_url, expires_in)
209        else:
210            return _file_url(name, request)
211
212    def save_file(self, name: str, content: bytes) -> None:
213        """Save file to S3."""
214        self.client.put_object(
215            Bucket=self.bucket_name,
216            Key=f"{self.base_path}/{name}",
217            Body=content,
218            ACL="private",
219            ContentType=get_content_type(name),
220        )
221
222    @contextmanager
223    def save_file_stream(self, name: str) -> Iterator:
224        """Context manager for streaming file writes to S3."""
225        # Keep files in memory up to 5 MB
226        with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file:
227            yield file
228            file.seek(0)
229            self.client.upload_fileobj(
230                Fileobj=file,
231                Bucket=self.bucket_name,
232                Key=f"{self.base_path}/{name}",
233                ExtraArgs={
234                    "ACL": "private",
235                    "ContentType": get_content_type(name),
236                },
237            )
238
239    def delete_file(self, name: str) -> None:
240        """Delete file from S3."""
241        self.client.delete_object(
242            Bucket=self.bucket_name,
243            Key=f"{self.base_path}/{name}",
244        )
245
246    def file_exists(self, name: str) -> bool:
247        """Check if a file exists in S3."""
248        try:
249            self.client.head_object(
250                Bucket=self.bucket_name,
251                Key=f"{self.base_path}/{name}",
252            )
253            return True
254        except ClientError:
255            return False
 19class S3Backend(ManageableBackend):
 20    """S3-compatible object storage backend.
 21
 22    Stores files in s3-compatible storage:
 23    - Key prefix: {usage}/{schema}/{filename}
 24    - Supports full file management (upload, delete, list)
 25    - Generates presigned URLs for file access
 26    - Used when storage.backend=s3
 27    """
 28
 29    allowed_usages = list(FileUsage)  # All usages
 30    name = "s3"
 31
 32    def __init__(self, *args, **kwargs):
 33        super().__init__(*args, **kwargs)
 34        self._config = {}
 35        self._session = None
 36
 37    def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]:
 38        unset = object()
 39        current = self._config.get(key, unset)
 40        refreshed = CONFIG.refresh(
 41            f"storage.{self.usage.value}.{self.name}.{key}",
 42            CONFIG.refresh(f"storage.{self.name}.{key}", default),
 43        )
 44        if current is unset:
 45            current = refreshed
 46        self._config[key] = refreshed
 47        return (refreshed, current != refreshed)
 48
 49    @property
 50    def base_path(self) -> str:
 51        """S3 key prefix: {usage}/{schema}/"""
 52        return f"{self.usage.value}/{connection.schema_name}"
 53
 54    @property
 55    def bucket_name(self) -> str:
 56        return CONFIG.get(
 57            f"storage.{self.usage.value}.{self.name}.bucket_name",
 58            CONFIG.get(f"storage.{self.name}.bucket_name"),
 59        )
 60
 61    @property
 62    def session(self) -> boto3.Session:
 63        """Create boto3 session with configured credentials."""
 64        session_profile, session_profile_r = self._get_config("session_profile", None)
 65        if session_profile is not None:
 66            if session_profile_r or self._session is None:
 67                self._session = boto3.Session(profile_name=session_profile)
 68                return self._session
 69            else:
 70                return self._session
 71        else:
 72            access_key, access_key_r = self._get_config("access_key", None)
 73            secret_key, secret_key_r = self._get_config("secret_key", None)
 74            session_token, session_token_r = self._get_config("session_token", None)
 75            if access_key_r or secret_key_r or session_token_r or self._session is None:
 76                self._session = boto3.Session(
 77                    aws_access_key_id=access_key,
 78                    aws_secret_access_key=secret_key,
 79                    aws_session_token=session_token,
 80                )
 81                return self._session
 82            else:
 83                return self._session
 84
 85    @property
 86    def client(self):
 87        """Create S3 client with configured endpoint and region."""
 88        endpoint_url = CONFIG.get(
 89            f"storage.{self.usage.value}.{self.name}.endpoint",
 90            CONFIG.get(f"storage.{self.name}.endpoint", None),
 91        )
 92        use_ssl = CONFIG.get(
 93            f"storage.{self.usage.value}.{self.name}.use_ssl",
 94            CONFIG.get(f"storage.{self.name}.use_ssl", True),
 95        )
 96        region_name = CONFIG.get(
 97            f"storage.{self.usage.value}.{self.name}.region",
 98            CONFIG.get(f"storage.{self.name}.region", None),
 99        )
100        addressing_style = CONFIG.get(
101            f"storage.{self.usage.value}.{self.name}.addressing_style",
102            CONFIG.get(f"storage.{self.name}.addressing_style", "auto"),
103        )
104        signature_version = CONFIG.get(
105            f"storage.{self.usage.value}.{self.name}.signature_version",
106            CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"),
107        )
108        # Keep signature_version pass-through and let boto3/botocore handle it.
109        # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3`
110        # are the documented values:
111        # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407
112        # Botocore also supports additional signer names, so we intentionally do
113        # not enforce a restricted allowlist here.
114
115        return self.session.client(
116            "s3",
117            endpoint_url=endpoint_url,
118            use_ssl=use_ssl,
119            region_name=region_name,
120            config=Config(
121                signature_version=signature_version, s3={"addressing_style": addressing_style}
122            ),
123        )
124
125    @property
126    def manageable(self) -> bool:
127        return True
128
129    def supports_file(self, name: str) -> bool:
130        """We support all files"""
131        return True
132
133    def list_files(self) -> Generator[str]:
134        """List all files returning relative paths from base_path."""
135        paginator = self.client.get_paginator("list_objects_v2")
136        pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/")
137
138        for page in pages:
139            for obj in page.get("Contents", []):
140                key = obj["Key"]
141                # Remove base path prefix to get relative path
142                rel_path = key.removeprefix(f"{self.base_path}/")
143                if rel_path:  # Skip if it's just the directory itself
144                    yield rel_path
145
146    def file_url(
147        self,
148        name: str,
149        request: HttpRequest | None = None,
150        use_cache: bool = True,
151    ) -> str:
152        """Generate presigned URL for file access."""
153        use_https = CONFIG.get_bool(
154            f"storage.{self.usage.value}.{self.name}.secure_urls",
155            CONFIG.get_bool(f"storage.{self.name}.secure_urls", True),
156        )
157
158        expires_in = int(
159            timedelta_from_string(
160                CONFIG.get(
161                    f"storage.{self.usage.value}.{self.name}.url_expiry",
162                    CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
163                )
164            ).total_seconds()
165        )
166
167        def _file_url(name: str, request: HttpRequest | None) -> str:
168            params = {
169                "Bucket": self.bucket_name,
170                "Key": f"{self.base_path}/{name}",
171            }
172
173            url = self.client.generate_presigned_url(
174                "get_object",
175                Params=params,
176                ExpiresIn=expires_in,
177                HttpMethod="GET",
178            )
179
180            # Support custom domain for S3-compatible storage (so not AWS)
181            # Well, can't you do custom domains on AWS as well?
182            custom_domain = CONFIG.get(
183                f"storage.{self.usage.value}.{self.name}.custom_domain",
184                CONFIG.get(f"storage.{self.name}.custom_domain", None),
185            )
186            if custom_domain:
187                parsed = urlsplit(url)
188                scheme = "https" if use_https else "http"
189                path = parsed.path
190
191                # When using path-style addressing, the presigned URL contains the bucket
192                # name in the path (e.g., /bucket-name/key). Since custom_domain must
193                # include the bucket name (per docs), strip it from the path to avoid
194                # duplication. See: https://github.com/goauthentik/authentik/issues/19521
195                # Check with trailing slash to ensure exact bucket name match
196                if path.startswith(f"/{self.bucket_name}/"):
197                    path = path.removeprefix(f"/{self.bucket_name}")
198
199                # Normalize to avoid double slashes
200                custom_domain = custom_domain.rstrip("/")
201                if not path.startswith("/"):
202                    path = f"/{path}"
203
204                url = f"{scheme}://{custom_domain}{path}?{parsed.query}"
205
206            return url
207
208        if use_cache:
209            return self._cache_get_or_set(name, request, _file_url, expires_in)
210        else:
211            return _file_url(name, request)
212
213    def save_file(self, name: str, content: bytes) -> None:
214        """Save file to S3."""
215        self.client.put_object(
216            Bucket=self.bucket_name,
217            Key=f"{self.base_path}/{name}",
218            Body=content,
219            ACL="private",
220            ContentType=get_content_type(name),
221        )
222
223    @contextmanager
224    def save_file_stream(self, name: str) -> Iterator:
225        """Context manager for streaming file writes to S3."""
226        # Keep files in memory up to 5 MB
227        with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file:
228            yield file
229            file.seek(0)
230            self.client.upload_fileobj(
231                Fileobj=file,
232                Bucket=self.bucket_name,
233                Key=f"{self.base_path}/{name}",
234                ExtraArgs={
235                    "ACL": "private",
236                    "ContentType": get_content_type(name),
237                },
238            )
239
240    def delete_file(self, name: str) -> None:
241        """Delete file from S3."""
242        self.client.delete_object(
243            Bucket=self.bucket_name,
244            Key=f"{self.base_path}/{name}",
245        )
246
247    def file_exists(self, name: str) -> bool:
248        """Check if a file exists in S3."""
249        try:
250            self.client.head_object(
251                Bucket=self.bucket_name,
252                Key=f"{self.base_path}/{name}",
253            )
254            return True
255        except ClientError:
256            return False

S3-compatible object storage backend.

Stores files in s3-compatible storage:

  • Key prefix: {usage}/{schema}/{filename}
  • Supports full file management (upload, delete, list)
  • Generates presigned URLs for file access
  • Used when storage.backend=s3
S3Backend(*args, **kwargs)
32    def __init__(self, *args, **kwargs):
33        super().__init__(*args, **kwargs)
34        self._config = {}
35        self._session = None

Initialize backend for the given usage type.

Args: usage: FileUsage type enum value

allowed_usages = [<FileUsage.MEDIA: 'media'>, <FileUsage.REPORTS: 'reports'>]
name = 's3'
base_path: str
49    @property
50    def base_path(self) -> str:
51        """S3 key prefix: {usage}/{schema}/"""
52        return f"{self.usage.value}/{connection.schema_name}"

S3 key prefix: {usage}/{schema}/

bucket_name: str
54    @property
55    def bucket_name(self) -> str:
56        return CONFIG.get(
57            f"storage.{self.usage.value}.{self.name}.bucket_name",
58            CONFIG.get(f"storage.{self.name}.bucket_name"),
59        )
session: boto3.session.Session
61    @property
62    def session(self) -> boto3.Session:
63        """Create boto3 session with configured credentials."""
64        session_profile, session_profile_r = self._get_config("session_profile", None)
65        if session_profile is not None:
66            if session_profile_r or self._session is None:
67                self._session = boto3.Session(profile_name=session_profile)
68                return self._session
69            else:
70                return self._session
71        else:
72            access_key, access_key_r = self._get_config("access_key", None)
73            secret_key, secret_key_r = self._get_config("secret_key", None)
74            session_token, session_token_r = self._get_config("session_token", None)
75            if access_key_r or secret_key_r or session_token_r or self._session is None:
76                self._session = boto3.Session(
77                    aws_access_key_id=access_key,
78                    aws_secret_access_key=secret_key,
79                    aws_session_token=session_token,
80                )
81                return self._session
82            else:
83                return self._session

Create boto3 session with configured credentials.

client
 85    @property
 86    def client(self):
 87        """Create S3 client with configured endpoint and region."""
 88        endpoint_url = CONFIG.get(
 89            f"storage.{self.usage.value}.{self.name}.endpoint",
 90            CONFIG.get(f"storage.{self.name}.endpoint", None),
 91        )
 92        use_ssl = CONFIG.get(
 93            f"storage.{self.usage.value}.{self.name}.use_ssl",
 94            CONFIG.get(f"storage.{self.name}.use_ssl", True),
 95        )
 96        region_name = CONFIG.get(
 97            f"storage.{self.usage.value}.{self.name}.region",
 98            CONFIG.get(f"storage.{self.name}.region", None),
 99        )
100        addressing_style = CONFIG.get(
101            f"storage.{self.usage.value}.{self.name}.addressing_style",
102            CONFIG.get(f"storage.{self.name}.addressing_style", "auto"),
103        )
104        signature_version = CONFIG.get(
105            f"storage.{self.usage.value}.{self.name}.signature_version",
106            CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"),
107        )
108        # Keep signature_version pass-through and let boto3/botocore handle it.
109        # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3`
110        # are the documented values:
111        # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407
112        # Botocore also supports additional signer names, so we intentionally do
113        # not enforce a restricted allowlist here.
114
115        return self.session.client(
116            "s3",
117            endpoint_url=endpoint_url,
118            use_ssl=use_ssl,
119            region_name=region_name,
120            config=Config(
121                signature_version=signature_version, s3={"addressing_style": addressing_style}
122            ),
123        )

Create S3 client with configured endpoint and region.

manageable: bool
125    @property
126    def manageable(self) -> bool:
127        return True

Whether this backend can actually be used for management.

Used only for management check, not for created the backend

def supports_file(self, name: str) -> bool:
129    def supports_file(self, name: str) -> bool:
130        """We support all files"""
131        return True

We support all files

def list_files(self) -> Generator[str]:
133    def list_files(self) -> Generator[str]:
134        """List all files returning relative paths from base_path."""
135        paginator = self.client.get_paginator("list_objects_v2")
136        pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/")
137
138        for page in pages:
139            for obj in page.get("Contents", []):
140                key = obj["Key"]
141                # Remove base path prefix to get relative path
142                rel_path = key.removeprefix(f"{self.base_path}/")
143                if rel_path:  # Skip if it's just the directory itself
144                    yield rel_path

List all files returning relative paths from base_path.

def file_url( self, name: str, request: django.http.request.HttpRequest | None = None, use_cache: bool = True) -> str:
146    def file_url(
147        self,
148        name: str,
149        request: HttpRequest | None = None,
150        use_cache: bool = True,
151    ) -> str:
152        """Generate presigned URL for file access."""
153        use_https = CONFIG.get_bool(
154            f"storage.{self.usage.value}.{self.name}.secure_urls",
155            CONFIG.get_bool(f"storage.{self.name}.secure_urls", True),
156        )
157
158        expires_in = int(
159            timedelta_from_string(
160                CONFIG.get(
161                    f"storage.{self.usage.value}.{self.name}.url_expiry",
162                    CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
163                )
164            ).total_seconds()
165        )
166
167        def _file_url(name: str, request: HttpRequest | None) -> str:
168            params = {
169                "Bucket": self.bucket_name,
170                "Key": f"{self.base_path}/{name}",
171            }
172
173            url = self.client.generate_presigned_url(
174                "get_object",
175                Params=params,
176                ExpiresIn=expires_in,
177                HttpMethod="GET",
178            )
179
180            # Support custom domain for S3-compatible storage (so not AWS)
181            # Well, can't you do custom domains on AWS as well?
182            custom_domain = CONFIG.get(
183                f"storage.{self.usage.value}.{self.name}.custom_domain",
184                CONFIG.get(f"storage.{self.name}.custom_domain", None),
185            )
186            if custom_domain:
187                parsed = urlsplit(url)
188                scheme = "https" if use_https else "http"
189                path = parsed.path
190
191                # When using path-style addressing, the presigned URL contains the bucket
192                # name in the path (e.g., /bucket-name/key). Since custom_domain must
193                # include the bucket name (per docs), strip it from the path to avoid
194                # duplication. See: https://github.com/goauthentik/authentik/issues/19521
195                # Check with trailing slash to ensure exact bucket name match
196                if path.startswith(f"/{self.bucket_name}/"):
197                    path = path.removeprefix(f"/{self.bucket_name}")
198
199                # Normalize to avoid double slashes
200                custom_domain = custom_domain.rstrip("/")
201                if not path.startswith("/"):
202                    path = f"/{path}"
203
204                url = f"{scheme}://{custom_domain}{path}?{parsed.query}"
205
206            return url
207
208        if use_cache:
209            return self._cache_get_or_set(name, request, _file_url, expires_in)
210        else:
211            return _file_url(name, request)

Generate presigned URL for file access.

def save_file(self, name: str, content: bytes) -> None:
213    def save_file(self, name: str, content: bytes) -> None:
214        """Save file to S3."""
215        self.client.put_object(
216            Bucket=self.bucket_name,
217            Key=f"{self.base_path}/{name}",
218            Body=content,
219            ACL="private",
220            ContentType=get_content_type(name),
221        )

Save file to S3.

@contextmanager
def save_file_stream(self, name: str) -> Iterator:
223    @contextmanager
224    def save_file_stream(self, name: str) -> Iterator:
225        """Context manager for streaming file writes to S3."""
226        # Keep files in memory up to 5 MB
227        with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file:
228            yield file
229            file.seek(0)
230            self.client.upload_fileobj(
231                Fileobj=file,
232                Bucket=self.bucket_name,
233                Key=f"{self.base_path}/{name}",
234                ExtraArgs={
235                    "ACL": "private",
236                    "ContentType": get_content_type(name),
237                },
238            )

Context manager for streaming file writes to S3.

def delete_file(self, name: str) -> None:
240    def delete_file(self, name: str) -> None:
241        """Delete file from S3."""
242        self.client.delete_object(
243            Bucket=self.bucket_name,
244            Key=f"{self.base_path}/{name}",
245        )

Delete file from S3.

def file_exists(self, name: str) -> bool:
247    def file_exists(self, name: str) -> bool:
248        """Check if a file exists in S3."""
249        try:
250            self.client.head_object(
251                Bucket=self.bucket_name,
252                Key=f"{self.base_path}/{name}",
253            )
254            return True
255        except ClientError:
256            return False

Check if a file exists in S3.