authentik.policies.event_matcher.models
Event Matcher models
1"""Event Matcher models""" 2 3from itertools import chain 4 5from django.db import models 6from django.utils.translation import gettext as _ 7from djangoql.queryset import apply_search 8from rest_framework.serializers import BaseSerializer 9from structlog.stdlib import get_logger 10 11from authentik.api.search.ql import BaseSchema 12from authentik.events.models import Event, EventAction 13from authentik.policies.models import Policy 14from authentik.policies.types import PolicyRequest, PolicyResult 15 16LOGGER = get_logger() 17 18 19class EventMatcherPolicy(Policy): 20 """Passes when Event matches selected criteria.""" 21 22 query = models.TextField( 23 null=True, 24 default=None, 25 ) 26 action = models.TextField( 27 choices=EventAction.choices, 28 null=True, 29 default=None, 30 help_text=_( 31 "Match created events with this action type. " 32 "When left empty, all action types will be matched." 33 ), 34 ) 35 app = models.TextField( 36 null=True, 37 default=None, 38 help_text=_( 39 "Match events created by selected application. " 40 "When left empty, all applications are matched." 41 ), 42 ) 43 model = models.TextField( 44 null=True, 45 default=None, 46 help_text=_( 47 "Match events created by selected model. " 48 "When left empty, all models are matched. When an app is selected, " 49 "all the application's models are matched." 50 ), 51 ) 52 client_ip = models.TextField( 53 null=True, 54 default=None, 55 help_text=_( 56 "Matches Event's Client IP (strict matching, " 57 "for network matching use an Expression Policy)" 58 ), 59 ) 60 61 @property 62 def serializer(self) -> type[BaseSerializer]: 63 from authentik.policies.event_matcher.api import EventMatcherPolicySerializer 64 65 return EventMatcherPolicySerializer 66 67 @property 68 def component(self) -> str: 69 return "ak-policy-event-matcher-form" 70 71 def passes(self, request: PolicyRequest) -> PolicyResult: 72 if "event" not in request.context: 73 return PolicyResult(False) 74 event: Event = request.context["event"] 75 matches: list[PolicyResult] = [] 76 messages = [] 77 checks = [ 78 self.passes_query, 79 self.passes_action, 80 self.passes_client_ip, 81 self.passes_app, 82 self.passes_model, 83 ] 84 for checker in checks: 85 result = checker(request, event) 86 if result is None: 87 continue 88 LOGGER.debug( 89 "Event matcher check result", 90 checker=checker.__name__, 91 result=result, 92 ) 93 matches.append(result) 94 passing = all(x.passing for x in matches) 95 messages = chain(*[x.messages for x in matches]) 96 result = PolicyResult(passing, *messages) 97 result.source_results = matches 98 return result 99 100 def passes_query(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 101 """Check AKQL query""" 102 if not self.query: 103 return None 104 from authentik.events.api.events import EventViewSet 105 106 class InlineSchema(BaseSchema): 107 def get_fields(self, model): 108 return EventViewSet().get_ql_fields() 109 110 print(Event.objects.filter(pk=event.pk)) 111 qs = apply_search(Event.objects.filter(pk=event.pk), self.query, InlineSchema) 112 return PolicyResult(qs.exists(), "Query matched.") 113 114 def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 115 """Check if `self.action` matches""" 116 if self.action is None: 117 return None 118 return PolicyResult(self.action == event.action, "Action matched.") 119 120 def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 121 """Check if `self.client_ip` matches""" 122 if self.client_ip is None: 123 return None 124 return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.") 125 126 def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 127 """Check if `self.app` matches""" 128 if self.app is None: 129 return None 130 return PolicyResult(self.app == event.app, "App matched.") 131 132 def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 133 """Check if `self.model` is set, and pass if it matches the event's model""" 134 if self.model is None: 135 return None 136 event_model_info = event.context.get("model", {}) 137 event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}" 138 return PolicyResult(event_model == self.model, "Model matched.") 139 140 class Meta(Policy.PolicyMeta): 141 verbose_name = _("Event Matcher Policy") 142 verbose_name_plural = _("Event Matcher Policies")
20class EventMatcherPolicy(Policy): 21 """Passes when Event matches selected criteria.""" 22 23 query = models.TextField( 24 null=True, 25 default=None, 26 ) 27 action = models.TextField( 28 choices=EventAction.choices, 29 null=True, 30 default=None, 31 help_text=_( 32 "Match created events with this action type. " 33 "When left empty, all action types will be matched." 34 ), 35 ) 36 app = models.TextField( 37 null=True, 38 default=None, 39 help_text=_( 40 "Match events created by selected application. " 41 "When left empty, all applications are matched." 42 ), 43 ) 44 model = models.TextField( 45 null=True, 46 default=None, 47 help_text=_( 48 "Match events created by selected model. " 49 "When left empty, all models are matched. When an app is selected, " 50 "all the application's models are matched." 51 ), 52 ) 53 client_ip = models.TextField( 54 null=True, 55 default=None, 56 help_text=_( 57 "Matches Event's Client IP (strict matching, " 58 "for network matching use an Expression Policy)" 59 ), 60 ) 61 62 @property 63 def serializer(self) -> type[BaseSerializer]: 64 from authentik.policies.event_matcher.api import EventMatcherPolicySerializer 65 66 return EventMatcherPolicySerializer 67 68 @property 69 def component(self) -> str: 70 return "ak-policy-event-matcher-form" 71 72 def passes(self, request: PolicyRequest) -> PolicyResult: 73 if "event" not in request.context: 74 return PolicyResult(False) 75 event: Event = request.context["event"] 76 matches: list[PolicyResult] = [] 77 messages = [] 78 checks = [ 79 self.passes_query, 80 self.passes_action, 81 self.passes_client_ip, 82 self.passes_app, 83 self.passes_model, 84 ] 85 for checker in checks: 86 result = checker(request, event) 87 if result is None: 88 continue 89 LOGGER.debug( 90 "Event matcher check result", 91 checker=checker.__name__, 92 result=result, 93 ) 94 matches.append(result) 95 passing = all(x.passing for x in matches) 96 messages = chain(*[x.messages for x in matches]) 97 result = PolicyResult(passing, *messages) 98 result.source_results = matches 99 return result 100 101 def passes_query(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 102 """Check AKQL query""" 103 if not self.query: 104 return None 105 from authentik.events.api.events import EventViewSet 106 107 class InlineSchema(BaseSchema): 108 def get_fields(self, model): 109 return EventViewSet().get_ql_fields() 110 111 print(Event.objects.filter(pk=event.pk)) 112 qs = apply_search(Event.objects.filter(pk=event.pk), self.query, InlineSchema) 113 return PolicyResult(qs.exists(), "Query matched.") 114 115 def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 116 """Check if `self.action` matches""" 117 if self.action is None: 118 return None 119 return PolicyResult(self.action == event.action, "Action matched.") 120 121 def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 122 """Check if `self.client_ip` matches""" 123 if self.client_ip is None: 124 return None 125 return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.") 126 127 def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 128 """Check if `self.app` matches""" 129 if self.app is None: 130 return None 131 return PolicyResult(self.app == event.app, "App matched.") 132 133 def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 134 """Check if `self.model` is set, and pass if it matches the event's model""" 135 if self.model is None: 136 return None 137 event_model_info = event.context.get("model", {}) 138 event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}" 139 return PolicyResult(event_model == self.model, "Model matched.") 140 141 class Meta(Policy.PolicyMeta): 142 verbose_name = _("Event Matcher Policy") 143 verbose_name_plural = _("Event Matcher Policies")
Passes when Event matches selected criteria.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
62 @property 63 def serializer(self) -> type[BaseSerializer]: 64 from authentik.policies.event_matcher.api import EventMatcherPolicySerializer 65 66 return EventMatcherPolicySerializer
Get serializer for this model
72 def passes(self, request: PolicyRequest) -> PolicyResult: 73 if "event" not in request.context: 74 return PolicyResult(False) 75 event: Event = request.context["event"] 76 matches: list[PolicyResult] = [] 77 messages = [] 78 checks = [ 79 self.passes_query, 80 self.passes_action, 81 self.passes_client_ip, 82 self.passes_app, 83 self.passes_model, 84 ] 85 for checker in checks: 86 result = checker(request, event) 87 if result is None: 88 continue 89 LOGGER.debug( 90 "Event matcher check result", 91 checker=checker.__name__, 92 result=result, 93 ) 94 matches.append(result) 95 passing = all(x.passing for x in matches) 96 messages = chain(*[x.messages for x in matches]) 97 result = PolicyResult(passing, *messages) 98 result.source_results = matches 99 return result
Check if request passes this policy
101 def passes_query(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 102 """Check AKQL query""" 103 if not self.query: 104 return None 105 from authentik.events.api.events import EventViewSet 106 107 class InlineSchema(BaseSchema): 108 def get_fields(self, model): 109 return EventViewSet().get_ql_fields() 110 111 print(Event.objects.filter(pk=event.pk)) 112 qs = apply_search(Event.objects.filter(pk=event.pk), self.query, InlineSchema) 113 return PolicyResult(qs.exists(), "Query matched.")
Check AKQL query
115 def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 116 """Check if `self.action` matches""" 117 if self.action is None: 118 return None 119 return PolicyResult(self.action == event.action, "Action matched.")
Check if self.action matches
121 def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 122 """Check if `self.client_ip` matches""" 123 if self.client_ip is None: 124 return None 125 return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")
Check if self.client_ip matches
127 def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 128 """Check if `self.app` matches""" 129 if self.app is None: 130 return None 131 return PolicyResult(self.app == event.app, "App matched.")
Check if self.app matches
133 def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None: 134 """Check if `self.model` is set, and pass if it matches the event's model""" 135 if self.model is None: 136 return None 137 event_model_info = event.context.get("model", {}) 138 event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}" 139 return PolicyResult(event_model == self.model, "Model matched.")
Check if self.model is set, and pass if it matches the event's model
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.policies.models.Policy
- policy_uuid
- name
- execution_logging
- objects
- PolicyMeta
- created
- last_updated
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- bindings
- dummypolicy
- eventmatcherpolicy
- passwordexpirypolicy
- reputationpolicy
- expressionpolicy
- geoippolicy
- promptstage_set
- passwordpolicy
- uniquepasswordpolicy
The requested object does not exist
The query returned multiple objects when only one was expected.