authentik.policies.reputation.models
authentik reputation request policy
1"""authentik reputation request policy""" 2 3from datetime import timedelta 4from uuid import uuid4 5 6from django.db import models 7from django.db.models import Sum 8from django.db.models.query_utils import Q 9from django.utils.timezone import now 10from django.utils.translation import gettext as _ 11from rest_framework.serializers import BaseSerializer 12from structlog import get_logger 13 14from authentik.core.models import ExpiringModel 15from authentik.lib.config import CONFIG 16from authentik.lib.models import InternallyManagedMixin, SerializerModel 17from authentik.policies.models import Policy 18from authentik.policies.types import PolicyRequest, PolicyResult 19from authentik.root.middleware import ClientIPMiddleware 20 21LOGGER = get_logger() 22 23 24def reputation_expiry(): 25 """Reputation expiry""" 26 return now() + timedelta(seconds=CONFIG.get_int("reputation.expiry")) 27 28 29class ReputationPolicy(Policy): 30 """Return true if request IP/target username's score is below a certain threshold""" 31 32 check_ip = models.BooleanField(default=True) 33 check_username = models.BooleanField(default=True) 34 threshold = models.IntegerField(default=-5) 35 36 @property 37 def serializer(self) -> type[BaseSerializer]: 38 from authentik.policies.reputation.api import ReputationPolicySerializer 39 40 return ReputationPolicySerializer 41 42 @property 43 def component(self) -> str: 44 return "ak-policy-reputation-form" 45 46 def passes(self, request: PolicyRequest) -> PolicyResult: 47 remote_ip = ClientIPMiddleware.get_client_ip(request.http_request) 48 query = Q() 49 if self.check_ip: 50 query |= Q(ip=remote_ip) 51 if self.check_username: 52 query |= Q(identifier=request.user.username) 53 score = ( 54 Reputation.objects.filter(query).aggregate(total_score=Sum("score"))["total_score"] or 0 55 ) 56 passing = score <= self.threshold 57 LOGGER.debug( 58 "Score for user", 59 username=request.user.username, 60 remote_ip=remote_ip, 61 score=score, 62 passing=passing, 63 ) 64 return PolicyResult(bool(passing)) 65 66 class Meta(Policy.PolicyMeta): 67 verbose_name = _("Reputation Policy") 68 verbose_name_plural = _("Reputation Policies") 69 70 71class Reputation(InternallyManagedMixin, ExpiringModel, SerializerModel): 72 """Reputation for user and or IP.""" 73 74 reputation_uuid = models.UUIDField(primary_key=True, unique=True, default=uuid4) 75 76 identifier = models.TextField() 77 ip = models.GenericIPAddressField() 78 ip_geo_data = models.JSONField(default=dict) 79 ip_asn_data = models.JSONField(default=dict) 80 score = models.BigIntegerField(default=0) 81 82 expires = models.DateTimeField(default=reputation_expiry) 83 84 updated = models.DateTimeField(auto_now=True) 85 86 @property 87 def serializer(self) -> type[BaseSerializer]: 88 from authentik.policies.reputation.api import ReputationSerializer 89 90 return ReputationSerializer 91 92 def __str__(self) -> str: 93 return f"Reputation {self.identifier}/{self.ip} @ {self.score}" 94 95 class Meta: 96 verbose_name = _("Reputation Score") 97 verbose_name_plural = _("Reputation Scores") 98 unique_together = ("identifier", "ip") 99 indexes = ExpiringModel.Meta.indexes + [ 100 models.Index(fields=["identifier"]), 101 models.Index(fields=["ip"]), 102 models.Index(fields=["ip", "identifier"]), 103 ]
25def reputation_expiry(): 26 """Reputation expiry""" 27 return now() + timedelta(seconds=CONFIG.get_int("reputation.expiry"))
Reputation expiry
30class ReputationPolicy(Policy): 31 """Return true if request IP/target username's score is below a certain threshold""" 32 33 check_ip = models.BooleanField(default=True) 34 check_username = models.BooleanField(default=True) 35 threshold = models.IntegerField(default=-5) 36 37 @property 38 def serializer(self) -> type[BaseSerializer]: 39 from authentik.policies.reputation.api import ReputationPolicySerializer 40 41 return ReputationPolicySerializer 42 43 @property 44 def component(self) -> str: 45 return "ak-policy-reputation-form" 46 47 def passes(self, request: PolicyRequest) -> PolicyResult: 48 remote_ip = ClientIPMiddleware.get_client_ip(request.http_request) 49 query = Q() 50 if self.check_ip: 51 query |= Q(ip=remote_ip) 52 if self.check_username: 53 query |= Q(identifier=request.user.username) 54 score = ( 55 Reputation.objects.filter(query).aggregate(total_score=Sum("score"))["total_score"] or 0 56 ) 57 passing = score <= self.threshold 58 LOGGER.debug( 59 "Score for user", 60 username=request.user.username, 61 remote_ip=remote_ip, 62 score=score, 63 passing=passing, 64 ) 65 return PolicyResult(bool(passing)) 66 67 class Meta(Policy.PolicyMeta): 68 verbose_name = _("Reputation Policy") 69 verbose_name_plural = _("Reputation Policies")
Return true if request IP/target username's score is below a certain threshold
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
37 @property 38 def serializer(self) -> type[BaseSerializer]: 39 from authentik.policies.reputation.api import ReputationPolicySerializer 40 41 return ReputationPolicySerializer
Get serializer for this model
47 def passes(self, request: PolicyRequest) -> PolicyResult: 48 remote_ip = ClientIPMiddleware.get_client_ip(request.http_request) 49 query = Q() 50 if self.check_ip: 51 query |= Q(ip=remote_ip) 52 if self.check_username: 53 query |= Q(identifier=request.user.username) 54 score = ( 55 Reputation.objects.filter(query).aggregate(total_score=Sum("score"))["total_score"] or 0 56 ) 57 passing = score <= self.threshold 58 LOGGER.debug( 59 "Score for user", 60 username=request.user.username, 61 remote_ip=remote_ip, 62 score=score, 63 passing=passing, 64 ) 65 return PolicyResult(bool(passing))
Check if request passes this policy
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.policies.models.Policy
- policy_uuid
- name
- execution_logging
- objects
- PolicyMeta
- created
- last_updated
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- bindings
- dummypolicy
- eventmatcherpolicy
- passwordexpirypolicy
- reputationpolicy
- expressionpolicy
- geoippolicy
- promptstage_set
- passwordpolicy
- uniquepasswordpolicy
The requested object does not exist
The query returned multiple objects when only one was expected.
72class Reputation(InternallyManagedMixin, ExpiringModel, SerializerModel): 73 """Reputation for user and or IP.""" 74 75 reputation_uuid = models.UUIDField(primary_key=True, unique=True, default=uuid4) 76 77 identifier = models.TextField() 78 ip = models.GenericIPAddressField() 79 ip_geo_data = models.JSONField(default=dict) 80 ip_asn_data = models.JSONField(default=dict) 81 score = models.BigIntegerField(default=0) 82 83 expires = models.DateTimeField(default=reputation_expiry) 84 85 updated = models.DateTimeField(auto_now=True) 86 87 @property 88 def serializer(self) -> type[BaseSerializer]: 89 from authentik.policies.reputation.api import ReputationSerializer 90 91 return ReputationSerializer 92 93 def __str__(self) -> str: 94 return f"Reputation {self.identifier}/{self.ip} @ {self.score}" 95 96 class Meta: 97 verbose_name = _("Reputation Score") 98 verbose_name_plural = _("Reputation Scores") 99 unique_together = ("identifier", "ip") 100 indexes = ExpiringModel.Meta.indexes + [ 101 models.Index(fields=["identifier"]), 102 models.Index(fields=["ip"]), 103 models.Index(fields=["ip", "identifier"]), 104 ]
Reputation for user and or IP.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
87 @property 88 def serializer(self) -> type[BaseSerializer]: 89 from authentik.policies.reputation.api import ReputationSerializer 90 91 return ReputationSerializer
Get serializer for this model
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Inherited Members
The requested object does not exist
The query returned multiple objects when only one was expected.