authentik.policies.event_matcher.models
Event Matcher models
1"""Event Matcher models""" 2 3from itertools import chain 4 5from django.db import models 6from django.utils.translation import gettext as _ 7from rest_framework.serializers import BaseSerializer 8from structlog.stdlib import get_logger 9 10from authentik.events.models import Event, EventAction 11from authentik.policies.models import Policy 12from authentik.policies.types import PolicyRequest, PolicyResult 13 14LOGGER = get_logger() 15 16 17class EventMatcherPolicy(Policy): 18 """Passes when Event matches selected criteria.""" 19 20 action = models.TextField( 21 choices=EventAction.choices, 22 null=True, 23 default=None, 24 help_text=_( 25 "Match created events with this action type. " 26 "When left empty, all action types will be matched." 27 ), 28 ) 29 app = models.TextField( 30 null=True, 31 default=None, 32 help_text=_( 33 "Match events created by selected application. " 34 "When left empty, all applications are matched." 35 ), 36 ) 37 model = models.TextField( 38 null=True, 39 default=None, 40 help_text=_( 41 "Match events created by selected model. " 42 "When left empty, all models are matched. When an app is selected, " 43 "all the application's models are matched." 44 ), 45 ) 46 client_ip = models.TextField( 47 null=True, 48 default=None, 49 help_text=_( 50 "Matches Event's Client IP (strict matching, " 51 "for network matching use an Expression Policy)" 52 ), 53 ) 54 55 @property 56 def serializer(self) -> type[BaseSerializer]: 57 from authentik.policies.event_matcher.api import EventMatcherPolicySerializer 58 59 return EventMatcherPolicySerializer 60 61 @property 62 def component(self) -> str: 63 return "ak-policy-event-matcher-form" 64 65 def passes(self, request: PolicyRequest) -> PolicyResult: 66 if "event" not in request.context: 67 return PolicyResult(False) 68 event: Event = request.context["event"] 69 matches: list[PolicyResult] = [] 70 messages = [] 71 checks = [ 72 self.passes_action, 73 self.passes_client_ip, 74 self.passes_app, 75 self.passes_model, 76 ] 77 for checker in checks: 78 result = checker(request, event) 79 if result is None: 80 continue 81 LOGGER.debug( 82 "Event matcher check result", 83 checker=checker.__name__, 84 result=result, 85 ) 86 matches.append(result) 87 passing = all(x.passing for x in matches) 88 messages = chain(*[x.messages for x in matches]) 89 result = PolicyResult(passing, *messages) 90 result.source_results = matches 91 return result 92 93 def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 94 """Check if `self.action` matches""" 95 if self.action is None: 96 return None 97 return PolicyResult(self.action == event.action, "Action matched.") 98 99 def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 100 """Check if `self.client_ip` matches""" 101 if self.client_ip is None: 102 return None 103 return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.") 104 105 def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 106 """Check if `self.app` matches""" 107 if self.app is None: 108 return None 109 return PolicyResult(self.app == event.app, "App matched.") 110 111 def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 112 """Check if `self.model` is set, and pass if it matches the event's model""" 113 if self.model is None: 114 return None 115 event_model_info = event.context.get("model", {}) 116 event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}" 117 return PolicyResult(event_model == self.model, "Model matched.") 118 119 class Meta(Policy.PolicyMeta): 120 verbose_name = _("Event Matcher Policy") 121 verbose_name_plural = _("Event Matcher Policies")
18class EventMatcherPolicy(Policy): 19 """Passes when Event matches selected criteria.""" 20 21 action = models.TextField( 22 choices=EventAction.choices, 23 null=True, 24 default=None, 25 help_text=_( 26 "Match created events with this action type. " 27 "When left empty, all action types will be matched." 28 ), 29 ) 30 app = models.TextField( 31 null=True, 32 default=None, 33 help_text=_( 34 "Match events created by selected application. " 35 "When left empty, all applications are matched." 36 ), 37 ) 38 model = models.TextField( 39 null=True, 40 default=None, 41 help_text=_( 42 "Match events created by selected model. " 43 "When left empty, all models are matched. When an app is selected, " 44 "all the application's models are matched." 45 ), 46 ) 47 client_ip = models.TextField( 48 null=True, 49 default=None, 50 help_text=_( 51 "Matches Event's Client IP (strict matching, " 52 "for network matching use an Expression Policy)" 53 ), 54 ) 55 56 @property 57 def serializer(self) -> type[BaseSerializer]: 58 from authentik.policies.event_matcher.api import EventMatcherPolicySerializer 59 60 return EventMatcherPolicySerializer 61 62 @property 63 def component(self) -> str: 64 return "ak-policy-event-matcher-form" 65 66 def passes(self, request: PolicyRequest) -> PolicyResult: 67 if "event" not in request.context: 68 return PolicyResult(False) 69 event: Event = request.context["event"] 70 matches: list[PolicyResult] = [] 71 messages = [] 72 checks = [ 73 self.passes_action, 74 self.passes_client_ip, 75 self.passes_app, 76 self.passes_model, 77 ] 78 for checker in checks: 79 result = checker(request, event) 80 if result is None: 81 continue 82 LOGGER.debug( 83 "Event matcher check result", 84 checker=checker.__name__, 85 result=result, 86 ) 87 matches.append(result) 88 passing = all(x.passing for x in matches) 89 messages = chain(*[x.messages for x in matches]) 90 result = PolicyResult(passing, *messages) 91 result.source_results = matches 92 return result 93 94 def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 95 """Check if `self.action` matches""" 96 if self.action is None: 97 return None 98 return PolicyResult(self.action == event.action, "Action matched.") 99 100 def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 101 """Check if `self.client_ip` matches""" 102 if self.client_ip is None: 103 return None 104 return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.") 105 106 def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 107 """Check if `self.app` matches""" 108 if self.app is None: 109 return None 110 return PolicyResult(self.app == event.app, "App matched.") 111 112 def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 113 """Check if `self.model` is set, and pass if it matches the event's model""" 114 if self.model is None: 115 return None 116 event_model_info = event.context.get("model", {}) 117 event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}" 118 return PolicyResult(event_model == self.model, "Model matched.") 119 120 class Meta(Policy.PolicyMeta): 121 verbose_name = _("Event Matcher Policy") 122 verbose_name_plural = _("Event Matcher Policies")
Passes when Event matches selected criteria.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
56 @property 57 def serializer(self) -> type[BaseSerializer]: 58 from authentik.policies.event_matcher.api import EventMatcherPolicySerializer 59 60 return EventMatcherPolicySerializer
Get serializer for this model
66 def passes(self, request: PolicyRequest) -> PolicyResult: 67 if "event" not in request.context: 68 return PolicyResult(False) 69 event: Event = request.context["event"] 70 matches: list[PolicyResult] = [] 71 messages = [] 72 checks = [ 73 self.passes_action, 74 self.passes_client_ip, 75 self.passes_app, 76 self.passes_model, 77 ] 78 for checker in checks: 79 result = checker(request, event) 80 if result is None: 81 continue 82 LOGGER.debug( 83 "Event matcher check result", 84 checker=checker.__name__, 85 result=result, 86 ) 87 matches.append(result) 88 passing = all(x.passing for x in matches) 89 messages = chain(*[x.messages for x in matches]) 90 result = PolicyResult(passing, *messages) 91 result.source_results = matches 92 return result
Check if request passes this policy
94 def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 95 """Check if `self.action` matches""" 96 if self.action is None: 97 return None 98 return PolicyResult(self.action == event.action, "Action matched.")
Check if self.action matches
100 def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 101 """Check if `self.client_ip` matches""" 102 if self.client_ip is None: 103 return None 104 return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")
Check if self.client_ip matches
106 def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 107 """Check if `self.app` matches""" 108 if self.app is None: 109 return None 110 return PolicyResult(self.app == event.app, "App matched.")
Check if self.app matches
112 def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 113 """Check if `self.model` is set, and pass if it matches the event's model""" 114 if self.model is None: 115 return None 116 event_model_info = event.context.get("model", {}) 117 event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}" 118 return PolicyResult(event_model == self.model, "Model matched.")
Check if self.model is set, and pass if it matches the event's model
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.policies.models.Policy
- policy_uuid
- name
- execution_logging
- objects
- PolicyMeta
- created
- last_updated
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- bindings
- dummypolicy
- eventmatcherpolicy
- passwordexpirypolicy
- reputationpolicy
- expressionpolicy
- geoippolicy
- promptstage_set
- passwordpolicy
- uniquepasswordpolicy
The requested object does not exist
The query returned multiple objects when only one was expected.