authentik.policies.event_matcher.models

Event Matcher models

  1"""Event Matcher models"""
  2
  3from itertools import chain
  4
  5from django.db import models
  6from django.utils.translation import gettext as _
  7from djangoql.queryset import apply_search
  8from rest_framework.serializers import BaseSerializer
  9from structlog.stdlib import get_logger
 10
 11from authentik.api.search.ql import BaseSchema
 12from authentik.events.models import Event, EventAction
 13from authentik.policies.models import Policy
 14from authentik.policies.types import PolicyRequest, PolicyResult
 15
 16LOGGER = get_logger()
 17
 18
 19class EventMatcherPolicy(Policy):
 20    """Passes when Event matches selected criteria."""
 21
 22    query = models.TextField(
 23        null=True,
 24        default=None,
 25    )
 26    action = models.TextField(
 27        choices=EventAction.choices,
 28        null=True,
 29        default=None,
 30        help_text=_(
 31            "Match created events with this action type. "
 32            "When left empty, all action types will be matched."
 33        ),
 34    )
 35    app = models.TextField(
 36        null=True,
 37        default=None,
 38        help_text=_(
 39            "Match events created by selected application. "
 40            "When left empty, all applications are matched."
 41        ),
 42    )
 43    model = models.TextField(
 44        null=True,
 45        default=None,
 46        help_text=_(
 47            "Match events created by selected model. "
 48            "When left empty, all models are matched. When an app is selected, "
 49            "all the application's models are matched."
 50        ),
 51    )
 52    client_ip = models.TextField(
 53        null=True,
 54        default=None,
 55        help_text=_(
 56            "Matches Event's Client IP (strict matching, "
 57            "for network matching use an Expression Policy)"
 58        ),
 59    )
 60
 61    @property
 62    def serializer(self) -> type[BaseSerializer]:
 63        from authentik.policies.event_matcher.api import EventMatcherPolicySerializer
 64
 65        return EventMatcherPolicySerializer
 66
 67    @property
 68    def component(self) -> str:
 69        return "ak-policy-event-matcher-form"
 70
 71    def passes(self, request: PolicyRequest) -> PolicyResult:
 72        if "event" not in request.context:
 73            return PolicyResult(False)
 74        event: Event = request.context["event"]
 75        matches: list[PolicyResult] = []
 76        messages = []
 77        checks = [
 78            self.passes_query,
 79            self.passes_action,
 80            self.passes_client_ip,
 81            self.passes_app,
 82            self.passes_model,
 83        ]
 84        for checker in checks:
 85            result = checker(request, event)
 86            if result is None:
 87                continue
 88            LOGGER.debug(
 89                "Event matcher check result",
 90                checker=checker.__name__,
 91                result=result,
 92            )
 93            matches.append(result)
 94        passing = all(x.passing for x in matches)
 95        messages = chain(*[x.messages for x in matches])
 96        result = PolicyResult(passing, *messages)
 97        result.source_results = matches
 98        return result
 99
100    def passes_query(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
101        """Check AKQL query"""
102        if not self.query:
103            return None
104        from authentik.events.api.events import EventViewSet
105
106        class InlineSchema(BaseSchema):
107            def get_fields(self, model):
108                return EventViewSet().get_ql_fields()
109
110        print(Event.objects.filter(pk=event.pk))
111        qs = apply_search(Event.objects.filter(pk=event.pk), self.query, InlineSchema)
112        return PolicyResult(qs.exists(), "Query matched.")
113
114    def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
115        """Check if `self.action` matches"""
116        if self.action is None:
117            return None
118        return PolicyResult(self.action == event.action, "Action matched.")
119
120    def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
121        """Check if `self.client_ip` matches"""
122        if self.client_ip is None:
123            return None
124        return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")
125
126    def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
127        """Check if `self.app` matches"""
128        if self.app is None:
129            return None
130        return PolicyResult(self.app == event.app, "App matched.")
131
132    def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
133        """Check if `self.model` is set, and pass if it matches the event's model"""
134        if self.model is None:
135            return None
136        event_model_info = event.context.get("model", {})
137        event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}"
138        return PolicyResult(event_model == self.model, "Model matched.")
139
140    class Meta(Policy.PolicyMeta):
141        verbose_name = _("Event Matcher Policy")
142        verbose_name_plural = _("Event Matcher Policies")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class EventMatcherPolicy(authentik.policies.models.Policy):
 20class EventMatcherPolicy(Policy):
 21    """Passes when Event matches selected criteria."""
 22
 23    query = models.TextField(
 24        null=True,
 25        default=None,
 26    )
 27    action = models.TextField(
 28        choices=EventAction.choices,
 29        null=True,
 30        default=None,
 31        help_text=_(
 32            "Match created events with this action type. "
 33            "When left empty, all action types will be matched."
 34        ),
 35    )
 36    app = models.TextField(
 37        null=True,
 38        default=None,
 39        help_text=_(
 40            "Match events created by selected application. "
 41            "When left empty, all applications are matched."
 42        ),
 43    )
 44    model = models.TextField(
 45        null=True,
 46        default=None,
 47        help_text=_(
 48            "Match events created by selected model. "
 49            "When left empty, all models are matched. When an app is selected, "
 50            "all the application's models are matched."
 51        ),
 52    )
 53    client_ip = models.TextField(
 54        null=True,
 55        default=None,
 56        help_text=_(
 57            "Matches Event's Client IP (strict matching, "
 58            "for network matching use an Expression Policy)"
 59        ),
 60    )
 61
 62    @property
 63    def serializer(self) -> type[BaseSerializer]:
 64        from authentik.policies.event_matcher.api import EventMatcherPolicySerializer
 65
 66        return EventMatcherPolicySerializer
 67
 68    @property
 69    def component(self) -> str:
 70        return "ak-policy-event-matcher-form"
 71
 72    def passes(self, request: PolicyRequest) -> PolicyResult:
 73        if "event" not in request.context:
 74            return PolicyResult(False)
 75        event: Event = request.context["event"]
 76        matches: list[PolicyResult] = []
 77        messages = []
 78        checks = [
 79            self.passes_query,
 80            self.passes_action,
 81            self.passes_client_ip,
 82            self.passes_app,
 83            self.passes_model,
 84        ]
 85        for checker in checks:
 86            result = checker(request, event)
 87            if result is None:
 88                continue
 89            LOGGER.debug(
 90                "Event matcher check result",
 91                checker=checker.__name__,
 92                result=result,
 93            )
 94            matches.append(result)
 95        passing = all(x.passing for x in matches)
 96        messages = chain(*[x.messages for x in matches])
 97        result = PolicyResult(passing, *messages)
 98        result.source_results = matches
 99        return result
100
101    def passes_query(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
102        """Check AKQL query"""
103        if not self.query:
104            return None
105        from authentik.events.api.events import EventViewSet
106
107        class InlineSchema(BaseSchema):
108            def get_fields(self, model):
109                return EventViewSet().get_ql_fields()
110
111        print(Event.objects.filter(pk=event.pk))
112        qs = apply_search(Event.objects.filter(pk=event.pk), self.query, InlineSchema)
113        return PolicyResult(qs.exists(), "Query matched.")
114
115    def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
116        """Check if `self.action` matches"""
117        if self.action is None:
118            return None
119        return PolicyResult(self.action == event.action, "Action matched.")
120
121    def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
122        """Check if `self.client_ip` matches"""
123        if self.client_ip is None:
124            return None
125        return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")
126
127    def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
128        """Check if `self.app` matches"""
129        if self.app is None:
130            return None
131        return PolicyResult(self.app == event.app, "App matched.")
132
133    def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
134        """Check if `self.model` is set, and pass if it matches the event's model"""
135        if self.model is None:
136            return None
137        event_model_info = event.context.get("model", {})
138        event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}"
139        return PolicyResult(event_model == self.model, "Model matched.")
140
141    class Meta(Policy.PolicyMeta):
142        verbose_name = _("Event Matcher Policy")
143        verbose_name_plural = _("Event Matcher Policies")

Passes when Event matches selected criteria.

def query(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def action(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def app(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def model(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_ip(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
62    @property
63    def serializer(self) -> type[BaseSerializer]:
64        from authentik.policies.event_matcher.api import EventMatcherPolicySerializer
65
66        return EventMatcherPolicySerializer

Get serializer for this model

component: str
68    @property
69    def component(self) -> str:
70        return "ak-policy-event-matcher-form"

Return component used to edit this object

72    def passes(self, request: PolicyRequest) -> PolicyResult:
73        if "event" not in request.context:
74            return PolicyResult(False)
75        event: Event = request.context["event"]
76        matches: list[PolicyResult] = []
77        messages = []
78        checks = [
79            self.passes_query,
80            self.passes_action,
81            self.passes_client_ip,
82            self.passes_app,
83            self.passes_model,
84        ]
85        for checker in checks:
86            result = checker(request, event)
87            if result is None:
88                continue
89            LOGGER.debug(
90                "Event matcher check result",
91                checker=checker.__name__,
92                result=result,
93            )
94            matches.append(result)
95        passing = all(x.passing for x in matches)
96        messages = chain(*[x.messages for x in matches])
97        result = PolicyResult(passing, *messages)
98        result.source_results = matches
99        return result

Check if request passes this policy

101    def passes_query(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
102        """Check AKQL query"""
103        if not self.query:
104            return None
105        from authentik.events.api.events import EventViewSet
106
107        class InlineSchema(BaseSchema):
108            def get_fields(self, model):
109                return EventViewSet().get_ql_fields()
110
111        print(Event.objects.filter(pk=event.pk))
112        qs = apply_search(Event.objects.filter(pk=event.pk), self.query, InlineSchema)
113        return PolicyResult(qs.exists(), "Query matched.")

Check AKQL query

def passes_action( self, request: authentik.policies.types.PolicyRequest, event: authentik.events.models.Event) -> authentik.policies.types.PolicyResult | None:
115    def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
116        """Check if `self.action` matches"""
117        if self.action is None:
118            return None
119        return PolicyResult(self.action == event.action, "Action matched.")

Check if self.action matches

def passes_client_ip( self, request: authentik.policies.types.PolicyRequest, event: authentik.events.models.Event) -> authentik.policies.types.PolicyResult | None:
121    def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
122        """Check if `self.client_ip` matches"""
123        if self.client_ip is None:
124            return None
125        return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")

Check if self.client_ip matches

127    def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
128        """Check if `self.app` matches"""
129        if self.app is None:
130            return None
131        return PolicyResult(self.app == event.app, "App matched.")

Check if self.app matches

133    def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
134        """Check if `self.model` is set, and pass if it matches the event's model"""
135        if self.model is None:
136            return None
137        event_model_info = event.context.get("model", {})
138        event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}"
139        return PolicyResult(event_model == self.model, "Model matched.")

Check if self.model is set, and pass if it matches the event's model

def get_action_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

policy_ptr_id
policy_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class EventMatcherPolicy.DoesNotExist(authentik.policies.models.Policy.DoesNotExist):

The requested object does not exist

class EventMatcherPolicy.MultipleObjectsReturned(authentik.policies.models.Policy.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.