authentik.policies.types

policy structures

 1"""policy structures"""
 2
 3from __future__ import annotations
 4
 5from dataclasses import dataclass
 6from typing import TYPE_CHECKING, Any
 7
 8from django.db.models import Model
 9from django.http import HttpRequest
10from structlog.stdlib import get_logger
11
12from authentik.events.context_processors.base import get_context_processors
13
14if TYPE_CHECKING:
15    from authentik.core.models import User
16    from authentik.events.logs import LogEvent
17    from authentik.policies.models import PolicyBinding
18
19LOGGER = get_logger()
20CACHE_PREFIX = "goauthentik.io/policies/"
21
22
23@dataclass(slots=True)
24class PolicyRequest:
25    """Data-class to hold policy request data"""
26
27    user: User
28    http_request: HttpRequest | None
29    obj: Model | None
30    context: dict[str, Any]
31    debug: bool
32
33    def __init__(self, user: User):
34        self.user = user
35        self.http_request = None
36        self.obj = None
37        self.context = {}
38        self.debug = False
39
40    def set_http_request(self, request: HttpRequest):  # pragma: no cover
41        """Load data from HTTP request, including geoip when enabled"""
42        self.http_request = request
43        for processor in get_context_processors():
44            self.context.update(processor.enrich_context(request))
45
46    @property
47    def should_cache(self) -> bool:
48        """Check if this request's result should be cached"""
49        if not self.user.is_authenticated:
50            return False
51        if self.debug:
52            return False
53        return True
54
55    def __repr__(self) -> str:
56        return self.__str__()
57
58    def __str__(self):
59        text = f"<PolicyRequest user={self.user}"
60        if self.obj:
61            text += f" obj={self.obj}"
62        if self.http_request:
63            text += f" http_request={self.http_request}"
64        return text + ">"
65
66
67@dataclass(slots=True)
68class PolicyResult:
69    """Result from evaluating a policy."""
70
71    passing: bool
72    messages: tuple[str, ...]
73    raw_result: Any
74
75    source_binding: PolicyBinding | None
76    source_results: list[PolicyResult] | None
77
78    log_messages: list[LogEvent] | None
79
80    _exec_time: int | None
81
82    def __init__(self, passing: bool, *messages: str):
83        self.passing = passing
84        self.messages = messages
85        self.raw_result = None
86        self.source_binding = None
87        self.source_results = []
88        self.log_messages = []
89        self._exec_time = None
90
91    def __repr__(self):
92        return self.__str__()
93
94    def __str__(self):
95        if self.messages:
96            return f"<PolicyResult passing={self.passing} messages={self.messages}>"
97        return f"<PolicyResult passing={self.passing}>"
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
CACHE_PREFIX = 'goauthentik.io/policies/'
@dataclass(slots=True)
class PolicyRequest:
24@dataclass(slots=True)
25class PolicyRequest:
26    """Data-class to hold policy request data"""
27
28    user: User
29    http_request: HttpRequest | None
30    obj: Model | None
31    context: dict[str, Any]
32    debug: bool
33
34    def __init__(self, user: User):
35        self.user = user
36        self.http_request = None
37        self.obj = None
38        self.context = {}
39        self.debug = False
40
41    def set_http_request(self, request: HttpRequest):  # pragma: no cover
42        """Load data from HTTP request, including geoip when enabled"""
43        self.http_request = request
44        for processor in get_context_processors():
45            self.context.update(processor.enrich_context(request))
46
47    @property
48    def should_cache(self) -> bool:
49        """Check if this request's result should be cached"""
50        if not self.user.is_authenticated:
51            return False
52        if self.debug:
53            return False
54        return True
55
56    def __repr__(self) -> str:
57        return self.__str__()
58
59    def __str__(self):
60        text = f"<PolicyRequest user={self.user}"
61        if self.obj:
62            text += f" obj={self.obj}"
63        if self.http_request:
64            text += f" http_request={self.http_request}"
65        return text + ">"

Data-class to hold policy request data

PolicyRequest(user: authentik.core.models.User)
34    def __init__(self, user: User):
35        self.user = user
36        self.http_request = None
37        self.obj = None
38        self.context = {}
39        self.debug = False
http_request: django.http.request.HttpRequest | None
obj: django.db.models.base.Model | None
context: dict[str, typing.Any]
debug: bool
def set_http_request(self, request: django.http.request.HttpRequest):
41    def set_http_request(self, request: HttpRequest):  # pragma: no cover
42        """Load data from HTTP request, including geoip when enabled"""
43        self.http_request = request
44        for processor in get_context_processors():
45            self.context.update(processor.enrich_context(request))

Load data from HTTP request, including geoip when enabled

should_cache: bool
47    @property
48    def should_cache(self) -> bool:
49        """Check if this request's result should be cached"""
50        if not self.user.is_authenticated:
51            return False
52        if self.debug:
53            return False
54        return True

Check if this request's result should be cached

@dataclass(slots=True)
class PolicyResult:
68@dataclass(slots=True)
69class PolicyResult:
70    """Result from evaluating a policy."""
71
72    passing: bool
73    messages: tuple[str, ...]
74    raw_result: Any
75
76    source_binding: PolicyBinding | None
77    source_results: list[PolicyResult] | None
78
79    log_messages: list[LogEvent] | None
80
81    _exec_time: int | None
82
83    def __init__(self, passing: bool, *messages: str):
84        self.passing = passing
85        self.messages = messages
86        self.raw_result = None
87        self.source_binding = None
88        self.source_results = []
89        self.log_messages = []
90        self._exec_time = None
91
92    def __repr__(self):
93        return self.__str__()
94
95    def __str__(self):
96        if self.messages:
97            return f"<PolicyResult passing={self.passing} messages={self.messages}>"
98        return f"<PolicyResult passing={self.passing}>"

Result from evaluating a policy.

PolicyResult(passing: bool, *messages: str)
83    def __init__(self, passing: bool, *messages: str):
84        self.passing = passing
85        self.messages = messages
86        self.raw_result = None
87        self.source_binding = None
88        self.source_results = []
89        self.log_messages = []
90        self._exec_time = None
passing: bool
messages: tuple[str, ...]
raw_result: Any
source_results: list[PolicyResult] | None
log_messages: list[authentik.events.logs.LogEvent] | None