authentik.policies.reputation.models

authentik reputation request policy

  1"""authentik reputation request policy"""
  2
  3from datetime import timedelta
  4from uuid import uuid4
  5
  6from django.db import models
  7from django.db.models import Sum
  8from django.db.models.query_utils import Q
  9from django.utils.timezone import now
 10from django.utils.translation import gettext as _
 11from rest_framework.serializers import BaseSerializer
 12from structlog import get_logger
 13
 14from authentik.core.models import ExpiringModel
 15from authentik.lib.config import CONFIG
 16from authentik.lib.models import InternallyManagedMixin, SerializerModel
 17from authentik.policies.models import Policy
 18from authentik.policies.types import PolicyRequest, PolicyResult
 19from authentik.root.middleware import ClientIPMiddleware
 20
 21LOGGER = get_logger()
 22
 23
 24def reputation_expiry():
 25    """Reputation expiry"""
 26    return now() + timedelta(seconds=CONFIG.get_int("reputation.expiry"))
 27
 28
 29class ReputationPolicy(Policy):
 30    """Return true if request IP/target username's score is below a certain threshold"""
 31
 32    check_ip = models.BooleanField(default=True)
 33    check_username = models.BooleanField(default=True)
 34    threshold = models.IntegerField(default=-5)
 35
 36    @property
 37    def serializer(self) -> type[BaseSerializer]:
 38        from authentik.policies.reputation.api import ReputationPolicySerializer
 39
 40        return ReputationPolicySerializer
 41
 42    @property
 43    def component(self) -> str:
 44        return "ak-policy-reputation-form"
 45
 46    def passes(self, request: PolicyRequest) -> PolicyResult:
 47        remote_ip = ClientIPMiddleware.get_client_ip(request.http_request)
 48        query = Q()
 49        if self.check_ip:
 50            query |= Q(ip=remote_ip)
 51        if self.check_username:
 52            query |= Q(identifier=request.user.username)
 53        score = (
 54            Reputation.objects.filter(query).aggregate(total_score=Sum("score"))["total_score"] or 0
 55        )
 56        passing = score <= self.threshold
 57        LOGGER.debug(
 58            "Score for user",
 59            username=request.user.username,
 60            remote_ip=remote_ip,
 61            score=score,
 62            passing=passing,
 63        )
 64        return PolicyResult(bool(passing))
 65
 66    class Meta(Policy.PolicyMeta):
 67        verbose_name = _("Reputation Policy")
 68        verbose_name_plural = _("Reputation Policies")
 69
 70
 71class Reputation(InternallyManagedMixin, ExpiringModel, SerializerModel):
 72    """Reputation for user and or IP."""
 73
 74    reputation_uuid = models.UUIDField(primary_key=True, unique=True, default=uuid4)
 75
 76    identifier = models.TextField()
 77    ip = models.GenericIPAddressField()
 78    ip_geo_data = models.JSONField(default=dict)
 79    ip_asn_data = models.JSONField(default=dict)
 80    score = models.BigIntegerField(default=0)
 81
 82    expires = models.DateTimeField(default=reputation_expiry)
 83
 84    updated = models.DateTimeField(auto_now=True)
 85
 86    @property
 87    def serializer(self) -> type[BaseSerializer]:
 88        from authentik.policies.reputation.api import ReputationSerializer
 89
 90        return ReputationSerializer
 91
 92    def __str__(self) -> str:
 93        return f"Reputation {self.identifier}/{self.ip} @ {self.score}"
 94
 95    class Meta:
 96        verbose_name = _("Reputation Score")
 97        verbose_name_plural = _("Reputation Scores")
 98        unique_together = ("identifier", "ip")
 99        indexes = ExpiringModel.Meta.indexes + [
100            models.Index(fields=["identifier"]),
101            models.Index(fields=["ip"]),
102            models.Index(fields=["ip", "identifier"]),
103        ]
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
def reputation_expiry():
25def reputation_expiry():
26    """Reputation expiry"""
27    return now() + timedelta(seconds=CONFIG.get_int("reputation.expiry"))

Reputation expiry

class ReputationPolicy(authentik.policies.models.Policy):
30class ReputationPolicy(Policy):
31    """Return true if request IP/target username's score is below a certain threshold"""
32
33    check_ip = models.BooleanField(default=True)
34    check_username = models.BooleanField(default=True)
35    threshold = models.IntegerField(default=-5)
36
37    @property
38    def serializer(self) -> type[BaseSerializer]:
39        from authentik.policies.reputation.api import ReputationPolicySerializer
40
41        return ReputationPolicySerializer
42
43    @property
44    def component(self) -> str:
45        return "ak-policy-reputation-form"
46
47    def passes(self, request: PolicyRequest) -> PolicyResult:
48        remote_ip = ClientIPMiddleware.get_client_ip(request.http_request)
49        query = Q()
50        if self.check_ip:
51            query |= Q(ip=remote_ip)
52        if self.check_username:
53            query |= Q(identifier=request.user.username)
54        score = (
55            Reputation.objects.filter(query).aggregate(total_score=Sum("score"))["total_score"] or 0
56        )
57        passing = score <= self.threshold
58        LOGGER.debug(
59            "Score for user",
60            username=request.user.username,
61            remote_ip=remote_ip,
62            score=score,
63            passing=passing,
64        )
65        return PolicyResult(bool(passing))
66
67    class Meta(Policy.PolicyMeta):
68        verbose_name = _("Reputation Policy")
69        verbose_name_plural = _("Reputation Policies")

Return true if request IP/target username's score is below a certain threshold

def check_ip(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def check_username(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def threshold(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
37    @property
38    def serializer(self) -> type[BaseSerializer]:
39        from authentik.policies.reputation.api import ReputationPolicySerializer
40
41        return ReputationPolicySerializer

Get serializer for this model

component: str
43    @property
44    def component(self) -> str:
45        return "ak-policy-reputation-form"

Return component used to edit this object

47    def passes(self, request: PolicyRequest) -> PolicyResult:
48        remote_ip = ClientIPMiddleware.get_client_ip(request.http_request)
49        query = Q()
50        if self.check_ip:
51            query |= Q(ip=remote_ip)
52        if self.check_username:
53            query |= Q(identifier=request.user.username)
54        score = (
55            Reputation.objects.filter(query).aggregate(total_score=Sum("score"))["total_score"] or 0
56        )
57        passing = score <= self.threshold
58        LOGGER.debug(
59            "Score for user",
60            username=request.user.username,
61            remote_ip=remote_ip,
62            score=score,
63            passing=passing,
64        )
65        return PolicyResult(bool(passing))

Check if request passes this policy

policy_ptr_id
policy_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class ReputationPolicy.DoesNotExist(authentik.policies.models.Policy.DoesNotExist):

The requested object does not exist

class ReputationPolicy.MultipleObjectsReturned(authentik.policies.models.Policy.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

 72class Reputation(InternallyManagedMixin, ExpiringModel, SerializerModel):
 73    """Reputation for user and or IP."""
 74
 75    reputation_uuid = models.UUIDField(primary_key=True, unique=True, default=uuid4)
 76
 77    identifier = models.TextField()
 78    ip = models.GenericIPAddressField()
 79    ip_geo_data = models.JSONField(default=dict)
 80    ip_asn_data = models.JSONField(default=dict)
 81    score = models.BigIntegerField(default=0)
 82
 83    expires = models.DateTimeField(default=reputation_expiry)
 84
 85    updated = models.DateTimeField(auto_now=True)
 86
 87    @property
 88    def serializer(self) -> type[BaseSerializer]:
 89        from authentik.policies.reputation.api import ReputationSerializer
 90
 91        return ReputationSerializer
 92
 93    def __str__(self) -> str:
 94        return f"Reputation {self.identifier}/{self.ip} @ {self.score}"
 95
 96    class Meta:
 97        verbose_name = _("Reputation Score")
 98        verbose_name_plural = _("Reputation Scores")
 99        unique_together = ("identifier", "ip")
100        indexes = ExpiringModel.Meta.indexes + [
101            models.Index(fields=["identifier"]),
102            models.Index(fields=["ip"]),
103            models.Index(fields=["ip", "identifier"]),
104        ]

Reputation for user and or IP.

def reputation_uuid(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def identifier(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def ip(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def ip_geo_data(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def ip_asn_data(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def score(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def expires(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def updated(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
87    @property
88    def serializer(self) -> type[BaseSerializer]:
89        from authentik.policies.reputation.api import ReputationSerializer
90
91        return ReputationSerializer

Get serializer for this model

def expiring(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def get_next_by_expires(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_expires(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_next_by_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_updated(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

class Reputation.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class Reputation.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.