authentik.enterprise.policies.unique_password.models

  1from hashlib import sha1
  2
  3from django.contrib.auth.hashers import identify_hasher, make_password
  4from django.db import models
  5from django.utils.translation import gettext as _
  6from rest_framework.serializers import BaseSerializer
  7from structlog.stdlib import get_logger
  8
  9from authentik.core.models import User
 10from authentik.policies.models import Policy
 11from authentik.policies.types import PolicyRequest, PolicyResult
 12from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 13
 14LOGGER = get_logger()
 15
 16
 17class UniquePasswordPolicy(Policy):
 18    """This policy prevents users from reusing old passwords."""
 19
 20    password_field = models.TextField(
 21        default="password",
 22        help_text=_("Field key to check, field keys defined in Prompt stages are available."),
 23    )
 24
 25    # Limit on the number of previous passwords the policy evaluates
 26    # Also controls number of old passwords the system stores.
 27    num_historical_passwords = models.PositiveIntegerField(
 28        default=1,
 29        help_text=_("Number of passwords to check against."),
 30    )
 31
 32    @property
 33    def serializer(self) -> type[BaseSerializer]:
 34        from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer
 35
 36        return UniquePasswordPolicySerializer
 37
 38    @property
 39    def component(self) -> str:
 40        return "ak-policy-password-uniqueness-form"
 41
 42    def passes(self, request: PolicyRequest) -> PolicyResult:
 43        from authentik.enterprise.policies.unique_password.models import UserPasswordHistory
 44
 45        password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
 46            self.password_field, request.context.get(self.password_field)
 47        )
 48        if not password:
 49            LOGGER.warning(
 50                "Password field not found in request when checking UniquePasswordPolicy",
 51                field=self.password_field,
 52                fields=request.context.keys(),
 53            )
 54            return PolicyResult(False, _("Password not set in context"))
 55        password = str(password)
 56
 57        if not self.num_historical_passwords:
 58            # Policy not configured to check against any passwords
 59            return PolicyResult(True)
 60
 61        num_to_check = self.num_historical_passwords
 62        password_history = UserPasswordHistory.objects.filter(user=request.user).order_by(
 63            "-created_at"
 64        )[:num_to_check]
 65
 66        if not password_history:
 67            return PolicyResult(True)
 68
 69        for record in password_history:
 70            if not record.old_password:
 71                continue
 72
 73            if self._passwords_match(new_password=password, old_password=record.old_password):
 74                # Return on first match. Authentik does not consider timing attacks
 75                # on old passwords to be an attack surface.
 76                return PolicyResult(
 77                    False,
 78                    _("This password has been used previously. Please choose a different one."),
 79                )
 80
 81        return PolicyResult(True)
 82
 83    def _passwords_match(self, *, new_password: str, old_password: str) -> bool:
 84        try:
 85            hasher = identify_hasher(old_password)
 86        except ValueError:
 87            LOGGER.warning(
 88                "Skipping password; could not load hash algorithm",
 89            )
 90            return False
 91
 92        return hasher.verify(new_password, old_password)
 93
 94    @classmethod
 95    def is_in_use(cls):
 96        """Check if any UniquePasswordPolicy is in use, either through policy bindings
 97        or direct attachment to a PromptStage.
 98
 99        Returns:
100            bool: True if any policy is in use, False otherwise
101        """
102        from authentik.policies.models import PolicyBinding
103
104        # Check if any policy is in use through bindings
105        if PolicyBinding.in_use.for_policy(cls).exists():
106            return True
107
108        # Check if any policy is attached to a PromptStage
109        if cls.objects.filter(promptstage__isnull=False).exists():
110            return True
111
112        return False
113
114    class Meta(Policy.PolicyMeta):
115        verbose_name = _("Password Uniqueness Policy")
116        verbose_name_plural = _("Password Uniqueness Policies")
117
118
119class UserPasswordHistory(models.Model):
120    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="old_passwords")
121    # Mimic's column type of AbstractBaseUser.password
122    old_password = models.CharField(max_length=128)
123    created_at = models.DateTimeField(auto_now_add=True)
124
125    hibp_prefix_sha1 = models.CharField(max_length=5)
126    hibp_pw_hash = models.TextField()
127
128    class Meta:
129        verbose_name = _("User Password History")
130
131    def __str__(self) -> str:
132        timestamp = f"{self.created_at:%Y/%m/%d %X}" if self.created_at else "N/A"
133        return f"Previous Password (user: {self.user_id}, recorded: {timestamp})"
134
135    @classmethod
136    def create_for_user(cls, user: User, password: str):
137        # To check users' passwords against Have I been Pwned, we need the first 5 chars
138        # of the password hashed with SHA1 without a salt...
139        pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest()  # nosec
140        # ...however that'll give us a list of hashes from HIBP, and to compare that we still
141        # need a full unsalted SHA1 of the password. We don't want to save that directly in
142        # the database, so we hash that SHA1 again with a modern hashing alg,
143        # and then when we check users' passwords against HIBP we can use `check_password`
144        # which will take care of this.
145        hibp_hash_hash = make_password(pw_hash_sha1)
146        return cls.objects.create(
147            user=user,
148            old_password=password,
149            hibp_prefix_sha1=pw_hash_sha1[:5],
150            hibp_pw_hash=hibp_hash_hash,
151        )
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class UniquePasswordPolicy(authentik.policies.models.Policy):
 18class UniquePasswordPolicy(Policy):
 19    """This policy prevents users from reusing old passwords."""
 20
 21    password_field = models.TextField(
 22        default="password",
 23        help_text=_("Field key to check, field keys defined in Prompt stages are available."),
 24    )
 25
 26    # Limit on the number of previous passwords the policy evaluates
 27    # Also controls number of old passwords the system stores.
 28    num_historical_passwords = models.PositiveIntegerField(
 29        default=1,
 30        help_text=_("Number of passwords to check against."),
 31    )
 32
 33    @property
 34    def serializer(self) -> type[BaseSerializer]:
 35        from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer
 36
 37        return UniquePasswordPolicySerializer
 38
 39    @property
 40    def component(self) -> str:
 41        return "ak-policy-password-uniqueness-form"
 42
 43    def passes(self, request: PolicyRequest) -> PolicyResult:
 44        from authentik.enterprise.policies.unique_password.models import UserPasswordHistory
 45
 46        password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
 47            self.password_field, request.context.get(self.password_field)
 48        )
 49        if not password:
 50            LOGGER.warning(
 51                "Password field not found in request when checking UniquePasswordPolicy",
 52                field=self.password_field,
 53                fields=request.context.keys(),
 54            )
 55            return PolicyResult(False, _("Password not set in context"))
 56        password = str(password)
 57
 58        if not self.num_historical_passwords:
 59            # Policy not configured to check against any passwords
 60            return PolicyResult(True)
 61
 62        num_to_check = self.num_historical_passwords
 63        password_history = UserPasswordHistory.objects.filter(user=request.user).order_by(
 64            "-created_at"
 65        )[:num_to_check]
 66
 67        if not password_history:
 68            return PolicyResult(True)
 69
 70        for record in password_history:
 71            if not record.old_password:
 72                continue
 73
 74            if self._passwords_match(new_password=password, old_password=record.old_password):
 75                # Return on first match. Authentik does not consider timing attacks
 76                # on old passwords to be an attack surface.
 77                return PolicyResult(
 78                    False,
 79                    _("This password has been used previously. Please choose a different one."),
 80                )
 81
 82        return PolicyResult(True)
 83
 84    def _passwords_match(self, *, new_password: str, old_password: str) -> bool:
 85        try:
 86            hasher = identify_hasher(old_password)
 87        except ValueError:
 88            LOGGER.warning(
 89                "Skipping password; could not load hash algorithm",
 90            )
 91            return False
 92
 93        return hasher.verify(new_password, old_password)
 94
 95    @classmethod
 96    def is_in_use(cls):
 97        """Check if any UniquePasswordPolicy is in use, either through policy bindings
 98        or direct attachment to a PromptStage.
 99
100        Returns:
101            bool: True if any policy is in use, False otherwise
102        """
103        from authentik.policies.models import PolicyBinding
104
105        # Check if any policy is in use through bindings
106        if PolicyBinding.in_use.for_policy(cls).exists():
107            return True
108
109        # Check if any policy is attached to a PromptStage
110        if cls.objects.filter(promptstage__isnull=False).exists():
111            return True
112
113        return False
114
115    class Meta(Policy.PolicyMeta):
116        verbose_name = _("Password Uniqueness Policy")
117        verbose_name_plural = _("Password Uniqueness Policies")

This policy prevents users from reusing old passwords.

def password_field(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def num_historical_passwords(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
33    @property
34    def serializer(self) -> type[BaseSerializer]:
35        from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer
36
37        return UniquePasswordPolicySerializer

Get serializer for this model

component: str
39    @property
40    def component(self) -> str:
41        return "ak-policy-password-uniqueness-form"

Return component used to edit this object

43    def passes(self, request: PolicyRequest) -> PolicyResult:
44        from authentik.enterprise.policies.unique_password.models import UserPasswordHistory
45
46        password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
47            self.password_field, request.context.get(self.password_field)
48        )
49        if not password:
50            LOGGER.warning(
51                "Password field not found in request when checking UniquePasswordPolicy",
52                field=self.password_field,
53                fields=request.context.keys(),
54            )
55            return PolicyResult(False, _("Password not set in context"))
56        password = str(password)
57
58        if not self.num_historical_passwords:
59            # Policy not configured to check against any passwords
60            return PolicyResult(True)
61
62        num_to_check = self.num_historical_passwords
63        password_history = UserPasswordHistory.objects.filter(user=request.user).order_by(
64            "-created_at"
65        )[:num_to_check]
66
67        if not password_history:
68            return PolicyResult(True)
69
70        for record in password_history:
71            if not record.old_password:
72                continue
73
74            if self._passwords_match(new_password=password, old_password=record.old_password):
75                # Return on first match. Authentik does not consider timing attacks
76                # on old passwords to be an attack surface.
77                return PolicyResult(
78                    False,
79                    _("This password has been used previously. Please choose a different one."),
80                )
81
82        return PolicyResult(True)

Check if request passes this policy

@classmethod
def is_in_use(cls):
 95    @classmethod
 96    def is_in_use(cls):
 97        """Check if any UniquePasswordPolicy is in use, either through policy bindings
 98        or direct attachment to a PromptStage.
 99
100        Returns:
101            bool: True if any policy is in use, False otherwise
102        """
103        from authentik.policies.models import PolicyBinding
104
105        # Check if any policy is in use through bindings
106        if PolicyBinding.in_use.for_policy(cls).exists():
107            return True
108
109        # Check if any policy is attached to a PromptStage
110        if cls.objects.filter(promptstage__isnull=False).exists():
111            return True
112
113        return False

Check if any UniquePasswordPolicy is in use, either through policy bindings or direct attachment to a PromptStage.

Returns: bool: True if any policy is in use, False otherwise

policy_ptr_id
policy_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class UniquePasswordPolicy.DoesNotExist(authentik.policies.models.Policy.DoesNotExist):

The requested object does not exist

class UniquePasswordPolicy.MultipleObjectsReturned(authentik.policies.models.Policy.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.

class UserPasswordHistory(django.db.models.base.Model):
120class UserPasswordHistory(models.Model):
121    user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="old_passwords")
122    # Mimic's column type of AbstractBaseUser.password
123    old_password = models.CharField(max_length=128)
124    created_at = models.DateTimeField(auto_now_add=True)
125
126    hibp_prefix_sha1 = models.CharField(max_length=5)
127    hibp_pw_hash = models.TextField()
128
129    class Meta:
130        verbose_name = _("User Password History")
131
132    def __str__(self) -> str:
133        timestamp = f"{self.created_at:%Y/%m/%d %X}" if self.created_at else "N/A"
134        return f"Previous Password (user: {self.user_id}, recorded: {timestamp})"
135
136    @classmethod
137    def create_for_user(cls, user: User, password: str):
138        # To check users' passwords against Have I been Pwned, we need the first 5 chars
139        # of the password hashed with SHA1 without a salt...
140        pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest()  # nosec
141        # ...however that'll give us a list of hashes from HIBP, and to compare that we still
142        # need a full unsalted SHA1 of the password. We don't want to save that directly in
143        # the database, so we hash that SHA1 again with a modern hashing alg,
144        # and then when we check users' passwords against HIBP we can use `check_password`
145        # which will take care of this.
146        hibp_hash_hash = make_password(pw_hash_sha1)
147        return cls.objects.create(
148            user=user,
149            old_password=password,
150            hibp_prefix_sha1=pw_hash_sha1[:5],
151            hibp_pw_hash=hibp_hash_hash,
152        )

UserPasswordHistory(id, user, old_password, created_at, hibp_prefix_sha1, hibp_pw_hash)

user

Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.

In the example::

class Child(Model):
    parent = ForeignKey(Parent, related_name='children')

Child.parent is a ForwardManyToOneDescriptor instance.

def old_password(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def created_at(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def hibp_prefix_sha1(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def hibp_pw_hash(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

@classmethod
def create_for_user(cls, user: authentik.core.models.User, password: str):
136    @classmethod
137    def create_for_user(cls, user: User, password: str):
138        # To check users' passwords against Have I been Pwned, we need the first 5 chars
139        # of the password hashed with SHA1 without a salt...
140        pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest()  # nosec
141        # ...however that'll give us a list of hashes from HIBP, and to compare that we still
142        # need a full unsalted SHA1 of the password. We don't want to save that directly in
143        # the database, so we hash that SHA1 again with a modern hashing alg,
144        # and then when we check users' passwords against HIBP we can use `check_password`
145        # which will take care of this.
146        hibp_hash_hash = make_password(pw_hash_sha1)
147        return cls.objects.create(
148            user=user,
149            old_password=password,
150            hibp_prefix_sha1=pw_hash_sha1[:5],
151            hibp_pw_hash=hibp_hash_hash,
152        )
user_id
def get_next_by_created_at(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def get_previous_by_created_at(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

def id(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def objects(unknown):

The type of the None singleton.

class UserPasswordHistory.DoesNotExist(django.core.exceptions.ObjectDoesNotExist):

The requested object does not exist

class UserPasswordHistory.MultipleObjectsReturned(django.core.exceptions.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.