authentik.policies.event_matcher.models

Event Matcher models

  1"""Event Matcher models"""
  2
  3from itertools import chain
  4
  5from django.db import models
  6from django.utils.translation import gettext as _
  7from rest_framework.serializers import BaseSerializer
  8from structlog.stdlib import get_logger
  9
 10from authentik.events.models import Event, EventAction
 11from authentik.policies.models import Policy
 12from authentik.policies.types import PolicyRequest, PolicyResult
 13
 14LOGGER = get_logger()
 15
 16
 17class EventMatcherPolicy(Policy):
 18    """Passes when Event matches selected criteria."""
 19
 20    action = models.TextField(
 21        choices=EventAction.choices,
 22        null=True,
 23        default=None,
 24        help_text=_(
 25            "Match created events with this action type. "
 26            "When left empty, all action types will be matched."
 27        ),
 28    )
 29    app = models.TextField(
 30        null=True,
 31        default=None,
 32        help_text=_(
 33            "Match events created by selected application. "
 34            "When left empty, all applications are matched."
 35        ),
 36    )
 37    model = models.TextField(
 38        null=True,
 39        default=None,
 40        help_text=_(
 41            "Match events created by selected model. "
 42            "When left empty, all models are matched. When an app is selected, "
 43            "all the application's models are matched."
 44        ),
 45    )
 46    client_ip = models.TextField(
 47        null=True,
 48        default=None,
 49        help_text=_(
 50            "Matches Event's Client IP (strict matching, "
 51            "for network matching use an Expression Policy)"
 52        ),
 53    )
 54
 55    @property
 56    def serializer(self) -> type[BaseSerializer]:
 57        from authentik.policies.event_matcher.api import EventMatcherPolicySerializer
 58
 59        return EventMatcherPolicySerializer
 60
 61    @property
 62    def component(self) -> str:
 63        return "ak-policy-event-matcher-form"
 64
 65    def passes(self, request: PolicyRequest) -> PolicyResult:
 66        if "event" not in request.context:
 67            return PolicyResult(False)
 68        event: Event = request.context["event"]
 69        matches: list[PolicyResult] = []
 70        messages = []
 71        checks = [
 72            self.passes_action,
 73            self.passes_client_ip,
 74            self.passes_app,
 75            self.passes_model,
 76        ]
 77        for checker in checks:
 78            result = checker(request, event)
 79            if result is None:
 80                continue
 81            LOGGER.debug(
 82                "Event matcher check result",
 83                checker=checker.__name__,
 84                result=result,
 85            )
 86            matches.append(result)
 87        passing = all(x.passing for x in matches)
 88        messages = chain(*[x.messages for x in matches])
 89        result = PolicyResult(passing, *messages)
 90        result.source_results = matches
 91        return result
 92
 93    def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
 94        """Check if `self.action` matches"""
 95        if self.action is None:
 96            return None
 97        return PolicyResult(self.action == event.action, "Action matched.")
 98
 99    def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
100        """Check if `self.client_ip` matches"""
101        if self.client_ip is None:
102            return None
103        return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")
104
105    def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
106        """Check if `self.app` matches"""
107        if self.app is None:
108            return None
109        return PolicyResult(self.app == event.app, "App matched.")
110
111    def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
112        """Check if `self.model` is set, and pass if it matches the event's model"""
113        if self.model is None:
114            return None
115        event_model_info = event.context.get("model", {})
116        event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}"
117        return PolicyResult(event_model == self.model, "Model matched.")
118
119    class Meta(Policy.PolicyMeta):
120        verbose_name = _("Event Matcher Policy")
121        verbose_name_plural = _("Event Matcher Policies")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class EventMatcherPolicy(authentik.policies.models.Policy):
 18class EventMatcherPolicy(Policy):
 19    """Passes when Event matches selected criteria."""
 20
 21    action = models.TextField(
 22        choices=EventAction.choices,
 23        null=True,
 24        default=None,
 25        help_text=_(
 26            "Match created events with this action type. "
 27            "When left empty, all action types will be matched."
 28        ),
 29    )
 30    app = models.TextField(
 31        null=True,
 32        default=None,
 33        help_text=_(
 34            "Match events created by selected application. "
 35            "When left empty, all applications are matched."
 36        ),
 37    )
 38    model = models.TextField(
 39        null=True,
 40        default=None,
 41        help_text=_(
 42            "Match events created by selected model. "
 43            "When left empty, all models are matched. When an app is selected, "
 44            "all the application's models are matched."
 45        ),
 46    )
 47    client_ip = models.TextField(
 48        null=True,
 49        default=None,
 50        help_text=_(
 51            "Matches Event's Client IP (strict matching, "
 52            "for network matching use an Expression Policy)"
 53        ),
 54    )
 55
 56    @property
 57    def serializer(self) -> type[BaseSerializer]:
 58        from authentik.policies.event_matcher.api import EventMatcherPolicySerializer
 59
 60        return EventMatcherPolicySerializer
 61
 62    @property
 63    def component(self) -> str:
 64        return "ak-policy-event-matcher-form"
 65
 66    def passes(self, request: PolicyRequest) -> PolicyResult:
 67        if "event" not in request.context:
 68            return PolicyResult(False)
 69        event: Event = request.context["event"]
 70        matches: list[PolicyResult] = []
 71        messages = []
 72        checks = [
 73            self.passes_action,
 74            self.passes_client_ip,
 75            self.passes_app,
 76            self.passes_model,
 77        ]
 78        for checker in checks:
 79            result = checker(request, event)
 80            if result is None:
 81                continue
 82            LOGGER.debug(
 83                "Event matcher check result",
 84                checker=checker.__name__,
 85                result=result,
 86            )
 87            matches.append(result)
 88        passing = all(x.passing for x in matches)
 89        messages = chain(*[x.messages for x in matches])
 90        result = PolicyResult(passing, *messages)
 91        result.source_results = matches
 92        return result
 93
 94    def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
 95        """Check if `self.action` matches"""
 96        if self.action is None:
 97            return None
 98        return PolicyResult(self.action == event.action, "Action matched.")
 99
100    def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
101        """Check if `self.client_ip` matches"""
102        if self.client_ip is None:
103            return None
104        return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")
105
106    def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
107        """Check if `self.app` matches"""
108        if self.app is None:
109            return None
110        return PolicyResult(self.app == event.app, "App matched.")
111
112    def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
113        """Check if `self.model` is set, and pass if it matches the event's model"""
114        if self.model is None:
115            return None
116        event_model_info = event.context.get("model", {})
117        event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}"
118        return PolicyResult(event_model == self.model, "Model matched.")
119
120    class Meta(Policy.PolicyMeta):
121        verbose_name = _("Event Matcher Policy")
122        verbose_name_plural = _("Event Matcher Policies")

Passes when Event matches selected criteria.

def action(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def app(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def model(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def client_ip(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
56    @property
57    def serializer(self) -> type[BaseSerializer]:
58        from authentik.policies.event_matcher.api import EventMatcherPolicySerializer
59
60        return EventMatcherPolicySerializer

Get serializer for this model

component: str
62    @property
63    def component(self) -> str:
64        return "ak-policy-event-matcher-form"

Return component used to edit this object

66    def passes(self, request: PolicyRequest) -> PolicyResult:
67        if "event" not in request.context:
68            return PolicyResult(False)
69        event: Event = request.context["event"]
70        matches: list[PolicyResult] = []
71        messages = []
72        checks = [
73            self.passes_action,
74            self.passes_client_ip,
75            self.passes_app,
76            self.passes_model,
77        ]
78        for checker in checks:
79            result = checker(request, event)
80            if result is None:
81                continue
82            LOGGER.debug(
83                "Event matcher check result",
84                checker=checker.__name__,
85                result=result,
86            )
87            matches.append(result)
88        passing = all(x.passing for x in matches)
89        messages = chain(*[x.messages for x in matches])
90        result = PolicyResult(passing, *messages)
91        result.source_results = matches
92        return result

Check if request passes this policy

def passes_action( self, request: authentik.policies.types.PolicyRequest, event: authentik.events.models.Event) -> authentik.policies.types.PolicyResult | None:
94    def passes_action(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
95        """Check if `self.action` matches"""
96        if self.action is None:
97            return None
98        return PolicyResult(self.action == event.action, "Action matched.")

Check if self.action matches

def passes_client_ip( self, request: authentik.policies.types.PolicyRequest, event: authentik.events.models.Event) -> authentik.policies.types.PolicyResult | None:
100    def passes_client_ip(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
101        """Check if `self.client_ip` matches"""
102        if self.client_ip is None:
103            return None
104        return PolicyResult(self.client_ip == event.client_ip, "Client IP matched.")

Check if self.client_ip matches

106    def passes_app(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
107        """Check if `self.app` matches"""
108        if self.app is None:
109            return None
110        return PolicyResult(self.app == event.app, "App matched.")

Check if self.app matches

112    def passes_model(self, request: PolicyRequest, event: Event) -> PolicyResult | None:
113        """Check if `self.model` is set, and pass if it matches the event's model"""
114        if self.model is None:
115            return None
116        event_model_info = event.context.get("model", {})
117        event_model = f"{event_model_info.get('app')}.{event_model_info.get('model_name')}"
118        return PolicyResult(event_model == self.model, "Model matched.")

Check if self.model is set, and pass if it matches the event's model

def get_action_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

policy_ptr_id
policy_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class EventMatcherPolicy.DoesNotExist(authentik.policies.models.Policy.DoesNotExist):

The requested object does not exist

class EventMatcherPolicy.MultipleObjectsReturned(authentik.policies.models.Policy.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.