authentik.admin.files.backends.s3

  1from collections.abc import Generator, Iterator
  2from contextlib import contextmanager
  3from tempfile import SpooledTemporaryFile
  4from urllib.parse import urlsplit, urlunsplit
  5
  6import boto3
  7from botocore.config import Config
  8from botocore.exceptions import ClientError
  9from django.db import connection
 10from django.http.request import HttpRequest
 11
 12from authentik.admin.files.backends.base import ManageableBackend, get_content_type
 13from authentik.admin.files.usage import FileUsage
 14from authentik.lib.config import CONFIG
 15from authentik.lib.utils.time import timedelta_from_string
 16
 17
 18class S3Backend(ManageableBackend):
 19    """S3-compatible object storage backend.
 20
 21    Stores files in s3-compatible storage:
 22    - Key prefix: {usage}/{schema}/{filename}
 23    - Supports full file management (upload, delete, list)
 24    - Generates presigned URLs for file access
 25    - Used when storage.backend=s3
 26    """
 27
 28    allowed_usages = list(FileUsage)  # All usages
 29    name = "s3"
 30
 31    def __init__(self, *args, **kwargs):
 32        super().__init__(*args, **kwargs)
 33        self._config = {}
 34        self._session = None
 35
 36    def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]:
 37        unset = object()
 38        current = self._config.get(key, unset)
 39        refreshed = CONFIG.refresh(
 40            f"storage.{self.usage.value}.{self.name}.{key}",
 41            CONFIG.refresh(f"storage.{self.name}.{key}", default),
 42        )
 43        if current is unset:
 44            current = refreshed
 45        self._config[key] = refreshed
 46        return (refreshed, current != refreshed)
 47
 48    @property
 49    def base_path(self) -> str:
 50        """S3 key prefix: {usage}/{schema}/"""
 51        return f"{self.usage.value}/{connection.schema_name}"
 52
 53    @property
 54    def bucket_name(self) -> str:
 55        return CONFIG.get(
 56            f"storage.{self.usage.value}.{self.name}.bucket_name",
 57            CONFIG.get(f"storage.{self.name}.bucket_name"),
 58        )
 59
 60    @property
 61    def session(self) -> boto3.Session:
 62        """Create boto3 session with configured credentials."""
 63        session_profile, session_profile_r = self._get_config("session_profile", None)
 64        if session_profile is not None:
 65            if session_profile_r or self._session is None:
 66                self._session = boto3.Session(profile_name=session_profile)
 67                return self._session
 68            else:
 69                return self._session
 70        else:
 71            access_key, access_key_r = self._get_config("access_key", None)
 72            secret_key, secret_key_r = self._get_config("secret_key", None)
 73            session_token, session_token_r = self._get_config("session_token", None)
 74            if access_key_r or secret_key_r or session_token_r or self._session is None:
 75                self._session = boto3.Session(
 76                    aws_access_key_id=access_key,
 77                    aws_secret_access_key=secret_key,
 78                    aws_session_token=session_token,
 79                )
 80                return self._session
 81            else:
 82                return self._session
 83
 84    @property
 85    def client(self):
 86        """Create S3 client with configured endpoint and region."""
 87        endpoint_url = CONFIG.get(
 88            f"storage.{self.usage.value}.{self.name}.endpoint",
 89            CONFIG.get(f"storage.{self.name}.endpoint", None),
 90        )
 91        use_ssl = CONFIG.get(
 92            f"storage.{self.usage.value}.{self.name}.use_ssl",
 93            CONFIG.get(f"storage.{self.name}.use_ssl", True),
 94        )
 95        region_name = CONFIG.get(
 96            f"storage.{self.usage.value}.{self.name}.region",
 97            CONFIG.get(f"storage.{self.name}.region", None),
 98        )
 99        addressing_style = CONFIG.get(
100            f"storage.{self.usage.value}.{self.name}.addressing_style",
101            CONFIG.get(f"storage.{self.name}.addressing_style", "auto"),
102        )
103        signature_version = CONFIG.get(
104            f"storage.{self.usage.value}.{self.name}.signature_version",
105            CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"),
106        )
107        # Keep signature_version pass-through and let boto3/botocore handle it.
108        # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3`
109        # are the documented values:
110        # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407
111        # Botocore also supports additional signer names, so we intentionally do
112        # not enforce a restricted allowlist here.
113
114        return self.session.client(
115            "s3",
116            endpoint_url=endpoint_url,
117            use_ssl=use_ssl,
118            region_name=region_name,
119            config=Config(
120                signature_version=signature_version, s3={"addressing_style": addressing_style}
121            ),
122        )
123
124    @property
125    def manageable(self) -> bool:
126        return True
127
128    def supports_file(self, name: str) -> bool:
129        """We support all files"""
130        return True
131
132    def list_files(self) -> Generator[str]:
133        """List all files returning relative paths from base_path."""
134        paginator = self.client.get_paginator("list_objects_v2")
135        pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/")
136
137        for page in pages:
138            for obj in page.get("Contents", []):
139                key = obj["Key"]
140                # Remove base path prefix to get relative path
141                rel_path = key.removeprefix(f"{self.base_path}/")
142                if rel_path:  # Skip if it's just the directory itself
143                    yield rel_path
144
145    def file_url(
146        self,
147        name: str,
148        request: HttpRequest | None = None,
149        use_cache: bool = True,
150    ) -> str:
151        """Generate presigned URL for file access."""
152        use_https = CONFIG.get_bool(
153            f"storage.{self.usage.value}.{self.name}.secure_urls",
154            CONFIG.get_bool(f"storage.{self.name}.secure_urls", True),
155        )
156
157        expires_in = int(
158            timedelta_from_string(
159                CONFIG.get(
160                    f"storage.{self.usage.value}.{self.name}.url_expiry",
161                    CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
162                )
163            ).total_seconds()
164        )
165
166        def _file_url(name: str, request: HttpRequest | None) -> str:
167            client = self.client
168            params = {
169                "Bucket": self.bucket_name,
170                "Key": f"{self.base_path}/{name}",
171            }
172
173            operation_name = "GetObject"
174            operation_model = client.meta.service_model.operation_model(operation_name)
175            request_dict = client._convert_to_request_dict(
176                params,
177                operation_model,
178                endpoint_url=client.meta.endpoint_url,
179                context={"is_presign_request": True},
180            )
181
182            # Support custom domain for S3-compatible storage (so not AWS)
183            # Well, can't you do custom domains on AWS as well?
184            custom_domain = CONFIG.get(
185                f"storage.{self.usage.value}.{self.name}.custom_domain",
186                CONFIG.get(f"storage.{self.name}.custom_domain", None),
187            )
188            if custom_domain:
189                scheme = "https" if use_https else "http"
190                path = request_dict["url_path"]
191
192                # When using path-style addressing, the presigned URL contains the bucket
193                # name in the path (e.g., /bucket-name/key). Since custom_domain must
194                # include the bucket name (per docs), strip it from the path to avoid
195                # duplication. See: https://github.com/goauthentik/authentik/issues/19521
196                # Check with trailing slash to ensure exact bucket name match
197                if path.startswith(f"/{self.bucket_name}/"):
198                    path = path.removeprefix(f"/{self.bucket_name}")
199
200                # Normalize to avoid double slashes
201                custom_domain = custom_domain.rstrip("/")
202                if not path.startswith("/"):
203                    path = f"/{path}"
204
205                custom_base = urlsplit(f"{scheme}://{custom_domain}")
206
207                # Sign the final public URL instead of signing the internal S3 endpoint and
208                # rewriting it afterwards. Presigned SigV4 URLs include the host header in the
209                # canonical request, so post-sign host changes break strict backends like RustFS.
210                public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path
211                request_dict["url_path"] = public_path
212                request_dict["url"] = urlunsplit(
213                    (custom_base.scheme, custom_base.netloc, public_path, "", "")
214                )
215
216            return client._request_signer.generate_presigned_url(
217                request_dict,
218                operation_name,
219                expires_in=expires_in,
220            )
221
222        if use_cache:
223            return self._cache_get_or_set(name, request, _file_url, expires_in)
224        else:
225            return _file_url(name, request)
226
227    def save_file(self, name: str, content: bytes) -> None:
228        """Save file to S3."""
229        self.client.put_object(
230            Bucket=self.bucket_name,
231            Key=f"{self.base_path}/{name}",
232            Body=content,
233            ACL="private",
234            ContentType=get_content_type(name),
235        )
236
237    @contextmanager
238    def save_file_stream(self, name: str) -> Iterator:
239        """Context manager for streaming file writes to S3."""
240        # Keep files in memory up to 5 MB
241        with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file:
242            yield file
243            file.seek(0)
244            self.client.upload_fileobj(
245                Fileobj=file,
246                Bucket=self.bucket_name,
247                Key=f"{self.base_path}/{name}",
248                ExtraArgs={
249                    "ACL": "private",
250                    "ContentType": get_content_type(name),
251                },
252            )
253
254    def delete_file(self, name: str) -> None:
255        """Delete file from S3."""
256        self.client.delete_object(
257            Bucket=self.bucket_name,
258            Key=f"{self.base_path}/{name}",
259        )
260
261    def file_exists(self, name: str) -> bool:
262        """Check if a file exists in S3."""
263        try:
264            self.client.head_object(
265                Bucket=self.bucket_name,
266                Key=f"{self.base_path}/{name}",
267            )
268            return True
269        except ClientError:
270            return False
 19class S3Backend(ManageableBackend):
 20    """S3-compatible object storage backend.
 21
 22    Stores files in s3-compatible storage:
 23    - Key prefix: {usage}/{schema}/{filename}
 24    - Supports full file management (upload, delete, list)
 25    - Generates presigned URLs for file access
 26    - Used when storage.backend=s3
 27    """
 28
 29    allowed_usages = list(FileUsage)  # All usages
 30    name = "s3"
 31
 32    def __init__(self, *args, **kwargs):
 33        super().__init__(*args, **kwargs)
 34        self._config = {}
 35        self._session = None
 36
 37    def _get_config(self, key: str, default: str | None) -> tuple[str | None, bool]:
 38        unset = object()
 39        current = self._config.get(key, unset)
 40        refreshed = CONFIG.refresh(
 41            f"storage.{self.usage.value}.{self.name}.{key}",
 42            CONFIG.refresh(f"storage.{self.name}.{key}", default),
 43        )
 44        if current is unset:
 45            current = refreshed
 46        self._config[key] = refreshed
 47        return (refreshed, current != refreshed)
 48
 49    @property
 50    def base_path(self) -> str:
 51        """S3 key prefix: {usage}/{schema}/"""
 52        return f"{self.usage.value}/{connection.schema_name}"
 53
 54    @property
 55    def bucket_name(self) -> str:
 56        return CONFIG.get(
 57            f"storage.{self.usage.value}.{self.name}.bucket_name",
 58            CONFIG.get(f"storage.{self.name}.bucket_name"),
 59        )
 60
 61    @property
 62    def session(self) -> boto3.Session:
 63        """Create boto3 session with configured credentials."""
 64        session_profile, session_profile_r = self._get_config("session_profile", None)
 65        if session_profile is not None:
 66            if session_profile_r or self._session is None:
 67                self._session = boto3.Session(profile_name=session_profile)
 68                return self._session
 69            else:
 70                return self._session
 71        else:
 72            access_key, access_key_r = self._get_config("access_key", None)
 73            secret_key, secret_key_r = self._get_config("secret_key", None)
 74            session_token, session_token_r = self._get_config("session_token", None)
 75            if access_key_r or secret_key_r or session_token_r or self._session is None:
 76                self._session = boto3.Session(
 77                    aws_access_key_id=access_key,
 78                    aws_secret_access_key=secret_key,
 79                    aws_session_token=session_token,
 80                )
 81                return self._session
 82            else:
 83                return self._session
 84
 85    @property
 86    def client(self):
 87        """Create S3 client with configured endpoint and region."""
 88        endpoint_url = CONFIG.get(
 89            f"storage.{self.usage.value}.{self.name}.endpoint",
 90            CONFIG.get(f"storage.{self.name}.endpoint", None),
 91        )
 92        use_ssl = CONFIG.get(
 93            f"storage.{self.usage.value}.{self.name}.use_ssl",
 94            CONFIG.get(f"storage.{self.name}.use_ssl", True),
 95        )
 96        region_name = CONFIG.get(
 97            f"storage.{self.usage.value}.{self.name}.region",
 98            CONFIG.get(f"storage.{self.name}.region", None),
 99        )
100        addressing_style = CONFIG.get(
101            f"storage.{self.usage.value}.{self.name}.addressing_style",
102            CONFIG.get(f"storage.{self.name}.addressing_style", "auto"),
103        )
104        signature_version = CONFIG.get(
105            f"storage.{self.usage.value}.{self.name}.signature_version",
106            CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"),
107        )
108        # Keep signature_version pass-through and let boto3/botocore handle it.
109        # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3`
110        # are the documented values:
111        # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407
112        # Botocore also supports additional signer names, so we intentionally do
113        # not enforce a restricted allowlist here.
114
115        return self.session.client(
116            "s3",
117            endpoint_url=endpoint_url,
118            use_ssl=use_ssl,
119            region_name=region_name,
120            config=Config(
121                signature_version=signature_version, s3={"addressing_style": addressing_style}
122            ),
123        )
124
125    @property
126    def manageable(self) -> bool:
127        return True
128
129    def supports_file(self, name: str) -> bool:
130        """We support all files"""
131        return True
132
133    def list_files(self) -> Generator[str]:
134        """List all files returning relative paths from base_path."""
135        paginator = self.client.get_paginator("list_objects_v2")
136        pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/")
137
138        for page in pages:
139            for obj in page.get("Contents", []):
140                key = obj["Key"]
141                # Remove base path prefix to get relative path
142                rel_path = key.removeprefix(f"{self.base_path}/")
143                if rel_path:  # Skip if it's just the directory itself
144                    yield rel_path
145
146    def file_url(
147        self,
148        name: str,
149        request: HttpRequest | None = None,
150        use_cache: bool = True,
151    ) -> str:
152        """Generate presigned URL for file access."""
153        use_https = CONFIG.get_bool(
154            f"storage.{self.usage.value}.{self.name}.secure_urls",
155            CONFIG.get_bool(f"storage.{self.name}.secure_urls", True),
156        )
157
158        expires_in = int(
159            timedelta_from_string(
160                CONFIG.get(
161                    f"storage.{self.usage.value}.{self.name}.url_expiry",
162                    CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
163                )
164            ).total_seconds()
165        )
166
167        def _file_url(name: str, request: HttpRequest | None) -> str:
168            client = self.client
169            params = {
170                "Bucket": self.bucket_name,
171                "Key": f"{self.base_path}/{name}",
172            }
173
174            operation_name = "GetObject"
175            operation_model = client.meta.service_model.operation_model(operation_name)
176            request_dict = client._convert_to_request_dict(
177                params,
178                operation_model,
179                endpoint_url=client.meta.endpoint_url,
180                context={"is_presign_request": True},
181            )
182
183            # Support custom domain for S3-compatible storage (so not AWS)
184            # Well, can't you do custom domains on AWS as well?
185            custom_domain = CONFIG.get(
186                f"storage.{self.usage.value}.{self.name}.custom_domain",
187                CONFIG.get(f"storage.{self.name}.custom_domain", None),
188            )
189            if custom_domain:
190                scheme = "https" if use_https else "http"
191                path = request_dict["url_path"]
192
193                # When using path-style addressing, the presigned URL contains the bucket
194                # name in the path (e.g., /bucket-name/key). Since custom_domain must
195                # include the bucket name (per docs), strip it from the path to avoid
196                # duplication. See: https://github.com/goauthentik/authentik/issues/19521
197                # Check with trailing slash to ensure exact bucket name match
198                if path.startswith(f"/{self.bucket_name}/"):
199                    path = path.removeprefix(f"/{self.bucket_name}")
200
201                # Normalize to avoid double slashes
202                custom_domain = custom_domain.rstrip("/")
203                if not path.startswith("/"):
204                    path = f"/{path}"
205
206                custom_base = urlsplit(f"{scheme}://{custom_domain}")
207
208                # Sign the final public URL instead of signing the internal S3 endpoint and
209                # rewriting it afterwards. Presigned SigV4 URLs include the host header in the
210                # canonical request, so post-sign host changes break strict backends like RustFS.
211                public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path
212                request_dict["url_path"] = public_path
213                request_dict["url"] = urlunsplit(
214                    (custom_base.scheme, custom_base.netloc, public_path, "", "")
215                )
216
217            return client._request_signer.generate_presigned_url(
218                request_dict,
219                operation_name,
220                expires_in=expires_in,
221            )
222
223        if use_cache:
224            return self._cache_get_or_set(name, request, _file_url, expires_in)
225        else:
226            return _file_url(name, request)
227
228    def save_file(self, name: str, content: bytes) -> None:
229        """Save file to S3."""
230        self.client.put_object(
231            Bucket=self.bucket_name,
232            Key=f"{self.base_path}/{name}",
233            Body=content,
234            ACL="private",
235            ContentType=get_content_type(name),
236        )
237
238    @contextmanager
239    def save_file_stream(self, name: str) -> Iterator:
240        """Context manager for streaming file writes to S3."""
241        # Keep files in memory up to 5 MB
242        with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file:
243            yield file
244            file.seek(0)
245            self.client.upload_fileobj(
246                Fileobj=file,
247                Bucket=self.bucket_name,
248                Key=f"{self.base_path}/{name}",
249                ExtraArgs={
250                    "ACL": "private",
251                    "ContentType": get_content_type(name),
252                },
253            )
254
255    def delete_file(self, name: str) -> None:
256        """Delete file from S3."""
257        self.client.delete_object(
258            Bucket=self.bucket_name,
259            Key=f"{self.base_path}/{name}",
260        )
261
262    def file_exists(self, name: str) -> bool:
263        """Check if a file exists in S3."""
264        try:
265            self.client.head_object(
266                Bucket=self.bucket_name,
267                Key=f"{self.base_path}/{name}",
268            )
269            return True
270        except ClientError:
271            return False

S3-compatible object storage backend.

Stores files in s3-compatible storage:

  • Key prefix: {usage}/{schema}/{filename}
  • Supports full file management (upload, delete, list)
  • Generates presigned URLs for file access
  • Used when storage.backend=s3
S3Backend(*args, **kwargs)
32    def __init__(self, *args, **kwargs):
33        super().__init__(*args, **kwargs)
34        self._config = {}
35        self._session = None

Initialize backend for the given usage type.

Args: usage: FileUsage type enum value

allowed_usages = [<FileUsage.MEDIA: 'media'>, <FileUsage.REPORTS: 'reports'>]
name = 's3'
base_path: str
49    @property
50    def base_path(self) -> str:
51        """S3 key prefix: {usage}/{schema}/"""
52        return f"{self.usage.value}/{connection.schema_name}"

S3 key prefix: {usage}/{schema}/

bucket_name: str
54    @property
55    def bucket_name(self) -> str:
56        return CONFIG.get(
57            f"storage.{self.usage.value}.{self.name}.bucket_name",
58            CONFIG.get(f"storage.{self.name}.bucket_name"),
59        )
session: boto3.session.Session
61    @property
62    def session(self) -> boto3.Session:
63        """Create boto3 session with configured credentials."""
64        session_profile, session_profile_r = self._get_config("session_profile", None)
65        if session_profile is not None:
66            if session_profile_r or self._session is None:
67                self._session = boto3.Session(profile_name=session_profile)
68                return self._session
69            else:
70                return self._session
71        else:
72            access_key, access_key_r = self._get_config("access_key", None)
73            secret_key, secret_key_r = self._get_config("secret_key", None)
74            session_token, session_token_r = self._get_config("session_token", None)
75            if access_key_r or secret_key_r or session_token_r or self._session is None:
76                self._session = boto3.Session(
77                    aws_access_key_id=access_key,
78                    aws_secret_access_key=secret_key,
79                    aws_session_token=session_token,
80                )
81                return self._session
82            else:
83                return self._session

Create boto3 session with configured credentials.

client
 85    @property
 86    def client(self):
 87        """Create S3 client with configured endpoint and region."""
 88        endpoint_url = CONFIG.get(
 89            f"storage.{self.usage.value}.{self.name}.endpoint",
 90            CONFIG.get(f"storage.{self.name}.endpoint", None),
 91        )
 92        use_ssl = CONFIG.get(
 93            f"storage.{self.usage.value}.{self.name}.use_ssl",
 94            CONFIG.get(f"storage.{self.name}.use_ssl", True),
 95        )
 96        region_name = CONFIG.get(
 97            f"storage.{self.usage.value}.{self.name}.region",
 98            CONFIG.get(f"storage.{self.name}.region", None),
 99        )
100        addressing_style = CONFIG.get(
101            f"storage.{self.usage.value}.{self.name}.addressing_style",
102            CONFIG.get(f"storage.{self.name}.addressing_style", "auto"),
103        )
104        signature_version = CONFIG.get(
105            f"storage.{self.usage.value}.{self.name}.signature_version",
106            CONFIG.get(f"storage.{self.name}.signature_version", "s3v4"),
107        )
108        # Keep signature_version pass-through and let boto3/botocore handle it.
109        # In boto3's S3 configuration docs, `s3v4` (default) and deprecated `s3`
110        # are the documented values:
111        # https://github.com/boto/boto3/blob/791a3e8f36d83664a47b4281a0586b3546cef3ec/docs/source/guide/configuration.rst?plain=1#L398-L407
112        # Botocore also supports additional signer names, so we intentionally do
113        # not enforce a restricted allowlist here.
114
115        return self.session.client(
116            "s3",
117            endpoint_url=endpoint_url,
118            use_ssl=use_ssl,
119            region_name=region_name,
120            config=Config(
121                signature_version=signature_version, s3={"addressing_style": addressing_style}
122            ),
123        )

Create S3 client with configured endpoint and region.

manageable: bool
125    @property
126    def manageable(self) -> bool:
127        return True

Whether this backend can actually be used for management.

Used only for management check, not for created the backend

def supports_file(self, name: str) -> bool:
129    def supports_file(self, name: str) -> bool:
130        """We support all files"""
131        return True

We support all files

def list_files(self) -> Generator[str]:
133    def list_files(self) -> Generator[str]:
134        """List all files returning relative paths from base_path."""
135        paginator = self.client.get_paginator("list_objects_v2")
136        pages = paginator.paginate(Bucket=self.bucket_name, Prefix=f"{self.base_path}/")
137
138        for page in pages:
139            for obj in page.get("Contents", []):
140                key = obj["Key"]
141                # Remove base path prefix to get relative path
142                rel_path = key.removeprefix(f"{self.base_path}/")
143                if rel_path:  # Skip if it's just the directory itself
144                    yield rel_path

List all files returning relative paths from base_path.

def file_url( self, name: str, request: django.http.request.HttpRequest | None = None, use_cache: bool = True) -> str:
146    def file_url(
147        self,
148        name: str,
149        request: HttpRequest | None = None,
150        use_cache: bool = True,
151    ) -> str:
152        """Generate presigned URL for file access."""
153        use_https = CONFIG.get_bool(
154            f"storage.{self.usage.value}.{self.name}.secure_urls",
155            CONFIG.get_bool(f"storage.{self.name}.secure_urls", True),
156        )
157
158        expires_in = int(
159            timedelta_from_string(
160                CONFIG.get(
161                    f"storage.{self.usage.value}.{self.name}.url_expiry",
162                    CONFIG.get(f"storage.{self.name}.url_expiry", "minutes=15"),
163                )
164            ).total_seconds()
165        )
166
167        def _file_url(name: str, request: HttpRequest | None) -> str:
168            client = self.client
169            params = {
170                "Bucket": self.bucket_name,
171                "Key": f"{self.base_path}/{name}",
172            }
173
174            operation_name = "GetObject"
175            operation_model = client.meta.service_model.operation_model(operation_name)
176            request_dict = client._convert_to_request_dict(
177                params,
178                operation_model,
179                endpoint_url=client.meta.endpoint_url,
180                context={"is_presign_request": True},
181            )
182
183            # Support custom domain for S3-compatible storage (so not AWS)
184            # Well, can't you do custom domains on AWS as well?
185            custom_domain = CONFIG.get(
186                f"storage.{self.usage.value}.{self.name}.custom_domain",
187                CONFIG.get(f"storage.{self.name}.custom_domain", None),
188            )
189            if custom_domain:
190                scheme = "https" if use_https else "http"
191                path = request_dict["url_path"]
192
193                # When using path-style addressing, the presigned URL contains the bucket
194                # name in the path (e.g., /bucket-name/key). Since custom_domain must
195                # include the bucket name (per docs), strip it from the path to avoid
196                # duplication. See: https://github.com/goauthentik/authentik/issues/19521
197                # Check with trailing slash to ensure exact bucket name match
198                if path.startswith(f"/{self.bucket_name}/"):
199                    path = path.removeprefix(f"/{self.bucket_name}")
200
201                # Normalize to avoid double slashes
202                custom_domain = custom_domain.rstrip("/")
203                if not path.startswith("/"):
204                    path = f"/{path}"
205
206                custom_base = urlsplit(f"{scheme}://{custom_domain}")
207
208                # Sign the final public URL instead of signing the internal S3 endpoint and
209                # rewriting it afterwards. Presigned SigV4 URLs include the host header in the
210                # canonical request, so post-sign host changes break strict backends like RustFS.
211                public_path = f"{custom_base.path.rstrip('/')}{path}" if custom_base.path else path
212                request_dict["url_path"] = public_path
213                request_dict["url"] = urlunsplit(
214                    (custom_base.scheme, custom_base.netloc, public_path, "", "")
215                )
216
217            return client._request_signer.generate_presigned_url(
218                request_dict,
219                operation_name,
220                expires_in=expires_in,
221            )
222
223        if use_cache:
224            return self._cache_get_or_set(name, request, _file_url, expires_in)
225        else:
226            return _file_url(name, request)

Generate presigned URL for file access.

def save_file(self, name: str, content: bytes) -> None:
228    def save_file(self, name: str, content: bytes) -> None:
229        """Save file to S3."""
230        self.client.put_object(
231            Bucket=self.bucket_name,
232            Key=f"{self.base_path}/{name}",
233            Body=content,
234            ACL="private",
235            ContentType=get_content_type(name),
236        )

Save file to S3.

@contextmanager
def save_file_stream(self, name: str) -> Iterator:
238    @contextmanager
239    def save_file_stream(self, name: str) -> Iterator:
240        """Context manager for streaming file writes to S3."""
241        # Keep files in memory up to 5 MB
242        with SpooledTemporaryFile(max_size=5 * 1024 * 1024, suffix=".S3File") as file:
243            yield file
244            file.seek(0)
245            self.client.upload_fileobj(
246                Fileobj=file,
247                Bucket=self.bucket_name,
248                Key=f"{self.base_path}/{name}",
249                ExtraArgs={
250                    "ACL": "private",
251                    "ContentType": get_content_type(name),
252                },
253            )

Context manager for streaming file writes to S3.

def delete_file(self, name: str) -> None:
255    def delete_file(self, name: str) -> None:
256        """Delete file from S3."""
257        self.client.delete_object(
258            Bucket=self.bucket_name,
259            Key=f"{self.base_path}/{name}",
260        )

Delete file from S3.

def file_exists(self, name: str) -> bool:
262    def file_exists(self, name: str) -> bool:
263        """Check if a file exists in S3."""
264        try:
265            self.client.head_object(
266                Bucket=self.bucket_name,
267                Key=f"{self.base_path}/{name}",
268            )
269            return True
270        except ClientError:
271            return False

Check if a file exists in S3.