authentik.enterprise.policies.unique_password.models
1from hashlib import sha1 2 3from django.contrib.auth.hashers import identify_hasher, make_password 4from django.db import models 5from django.utils.translation import gettext as _ 6from rest_framework.serializers import BaseSerializer 7from structlog.stdlib import get_logger 8 9from authentik.core.models import User 10from authentik.policies.models import Policy 11from authentik.policies.types import PolicyRequest, PolicyResult 12from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 13 14LOGGER = get_logger() 15 16 17class UniquePasswordPolicy(Policy): 18 """This policy prevents users from reusing old passwords.""" 19 20 password_field = models.TextField( 21 default="password", 22 help_text=_("Field key to check, field keys defined in Prompt stages are available."), 23 ) 24 25 # Limit on the number of previous passwords the policy evaluates 26 # Also controls number of old passwords the system stores. 27 num_historical_passwords = models.PositiveIntegerField( 28 default=1, 29 help_text=_("Number of passwords to check against."), 30 ) 31 32 @property 33 def serializer(self) -> type[BaseSerializer]: 34 from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer 35 36 return UniquePasswordPolicySerializer 37 38 @property 39 def component(self) -> str: 40 return "ak-policy-password-uniqueness-form" 41 42 def passes(self, request: PolicyRequest) -> PolicyResult: 43 from authentik.enterprise.policies.unique_password.models import UserPasswordHistory 44 45 password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get( 46 self.password_field, request.context.get(self.password_field) 47 ) 48 if not password: 49 LOGGER.warning( 50 "Password field not found in request when checking UniquePasswordPolicy", 51 field=self.password_field, 52 fields=request.context.keys(), 53 ) 54 return PolicyResult(False, _("Password not set in context")) 55 password = str(password) 56 57 if not self.num_historical_passwords: 58 # Policy not configured to check against any passwords 59 return PolicyResult(True) 60 61 num_to_check = self.num_historical_passwords 62 password_history = UserPasswordHistory.objects.filter(user=request.user).order_by( 63 "-created_at" 64 )[:num_to_check] 65 66 if not password_history: 67 return PolicyResult(True) 68 69 for record in password_history: 70 if not record.old_password: 71 continue 72 73 if self._passwords_match(new_password=password, old_password=record.old_password): 74 # Return on first match. Authentik does not consider timing attacks 75 # on old passwords to be an attack surface. 76 return PolicyResult( 77 False, 78 _("This password has been used previously. Please choose a different one."), 79 ) 80 81 return PolicyResult(True) 82 83 def _passwords_match(self, *, new_password: str, old_password: str) -> bool: 84 try: 85 hasher = identify_hasher(old_password) 86 except ValueError: 87 LOGGER.warning( 88 "Skipping password; could not load hash algorithm", 89 ) 90 return False 91 92 return hasher.verify(new_password, old_password) 93 94 @classmethod 95 def is_in_use(cls): 96 """Check if any UniquePasswordPolicy is in use, either through policy bindings 97 or direct attachment to a PromptStage. 98 99 Returns: 100 bool: True if any policy is in use, False otherwise 101 """ 102 from authentik.policies.models import PolicyBinding 103 104 # Check if any policy is in use through bindings 105 if PolicyBinding.in_use.for_policy(cls).exists(): 106 return True 107 108 # Check if any policy is attached to a PromptStage 109 if cls.objects.filter(promptstage__isnull=False).exists(): 110 return True 111 112 return False 113 114 class Meta(Policy.PolicyMeta): 115 verbose_name = _("Password Uniqueness Policy") 116 verbose_name_plural = _("Password Uniqueness Policies") 117 118 119class UserPasswordHistory(models.Model): 120 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="old_passwords") 121 # Mimic's column type of AbstractBaseUser.password 122 old_password = models.CharField(max_length=128) 123 created_at = models.DateTimeField(auto_now_add=True) 124 125 hibp_prefix_sha1 = models.CharField(max_length=5) 126 hibp_pw_hash = models.TextField() 127 128 class Meta: 129 verbose_name = _("User Password History") 130 131 def __str__(self) -> str: 132 timestamp = f"{self.created_at:%Y/%m/%d %X}" if self.created_at else "N/A" 133 return f"Previous Password (user: {self.user_id}, recorded: {timestamp})" 134 135 @classmethod 136 def create_for_user(cls, user: User, password: str): 137 # To check users' passwords against Have I been Pwned, we need the first 5 chars 138 # of the password hashed with SHA1 without a salt... 139 pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest() # nosec 140 # ...however that'll give us a list of hashes from HIBP, and to compare that we still 141 # need a full unsalted SHA1 of the password. We don't want to save that directly in 142 # the database, so we hash that SHA1 again with a modern hashing alg, 143 # and then when we check users' passwords against HIBP we can use `check_password` 144 # which will take care of this. 145 hibp_hash_hash = make_password(pw_hash_sha1) 146 return cls.objects.create( 147 user=user, 148 old_password=password, 149 hibp_prefix_sha1=pw_hash_sha1[:5], 150 hibp_pw_hash=hibp_hash_hash, 151 )
18class UniquePasswordPolicy(Policy): 19 """This policy prevents users from reusing old passwords.""" 20 21 password_field = models.TextField( 22 default="password", 23 help_text=_("Field key to check, field keys defined in Prompt stages are available."), 24 ) 25 26 # Limit on the number of previous passwords the policy evaluates 27 # Also controls number of old passwords the system stores. 28 num_historical_passwords = models.PositiveIntegerField( 29 default=1, 30 help_text=_("Number of passwords to check against."), 31 ) 32 33 @property 34 def serializer(self) -> type[BaseSerializer]: 35 from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer 36 37 return UniquePasswordPolicySerializer 38 39 @property 40 def component(self) -> str: 41 return "ak-policy-password-uniqueness-form" 42 43 def passes(self, request: PolicyRequest) -> PolicyResult: 44 from authentik.enterprise.policies.unique_password.models import UserPasswordHistory 45 46 password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get( 47 self.password_field, request.context.get(self.password_field) 48 ) 49 if not password: 50 LOGGER.warning( 51 "Password field not found in request when checking UniquePasswordPolicy", 52 field=self.password_field, 53 fields=request.context.keys(), 54 ) 55 return PolicyResult(False, _("Password not set in context")) 56 password = str(password) 57 58 if not self.num_historical_passwords: 59 # Policy not configured to check against any passwords 60 return PolicyResult(True) 61 62 num_to_check = self.num_historical_passwords 63 password_history = UserPasswordHistory.objects.filter(user=request.user).order_by( 64 "-created_at" 65 )[:num_to_check] 66 67 if not password_history: 68 return PolicyResult(True) 69 70 for record in password_history: 71 if not record.old_password: 72 continue 73 74 if self._passwords_match(new_password=password, old_password=record.old_password): 75 # Return on first match. Authentik does not consider timing attacks 76 # on old passwords to be an attack surface. 77 return PolicyResult( 78 False, 79 _("This password has been used previously. Please choose a different one."), 80 ) 81 82 return PolicyResult(True) 83 84 def _passwords_match(self, *, new_password: str, old_password: str) -> bool: 85 try: 86 hasher = identify_hasher(old_password) 87 except ValueError: 88 LOGGER.warning( 89 "Skipping password; could not load hash algorithm", 90 ) 91 return False 92 93 return hasher.verify(new_password, old_password) 94 95 @classmethod 96 def is_in_use(cls): 97 """Check if any UniquePasswordPolicy is in use, either through policy bindings 98 or direct attachment to a PromptStage. 99 100 Returns: 101 bool: True if any policy is in use, False otherwise 102 """ 103 from authentik.policies.models import PolicyBinding 104 105 # Check if any policy is in use through bindings 106 if PolicyBinding.in_use.for_policy(cls).exists(): 107 return True 108 109 # Check if any policy is attached to a PromptStage 110 if cls.objects.filter(promptstage__isnull=False).exists(): 111 return True 112 113 return False 114 115 class Meta(Policy.PolicyMeta): 116 verbose_name = _("Password Uniqueness Policy") 117 verbose_name_plural = _("Password Uniqueness Policies")
This policy prevents users from reusing old passwords.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
33 @property 34 def serializer(self) -> type[BaseSerializer]: 35 from authentik.enterprise.policies.unique_password.api import UniquePasswordPolicySerializer 36 37 return UniquePasswordPolicySerializer
Get serializer for this model
43 def passes(self, request: PolicyRequest) -> PolicyResult: 44 from authentik.enterprise.policies.unique_password.models import UserPasswordHistory 45 46 password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get( 47 self.password_field, request.context.get(self.password_field) 48 ) 49 if not password: 50 LOGGER.warning( 51 "Password field not found in request when checking UniquePasswordPolicy", 52 field=self.password_field, 53 fields=request.context.keys(), 54 ) 55 return PolicyResult(False, _("Password not set in context")) 56 password = str(password) 57 58 if not self.num_historical_passwords: 59 # Policy not configured to check against any passwords 60 return PolicyResult(True) 61 62 num_to_check = self.num_historical_passwords 63 password_history = UserPasswordHistory.objects.filter(user=request.user).order_by( 64 "-created_at" 65 )[:num_to_check] 66 67 if not password_history: 68 return PolicyResult(True) 69 70 for record in password_history: 71 if not record.old_password: 72 continue 73 74 if self._passwords_match(new_password=password, old_password=record.old_password): 75 # Return on first match. Authentik does not consider timing attacks 76 # on old passwords to be an attack surface. 77 return PolicyResult( 78 False, 79 _("This password has been used previously. Please choose a different one."), 80 ) 81 82 return PolicyResult(True)
Check if request passes this policy
95 @classmethod 96 def is_in_use(cls): 97 """Check if any UniquePasswordPolicy is in use, either through policy bindings 98 or direct attachment to a PromptStage. 99 100 Returns: 101 bool: True if any policy is in use, False otherwise 102 """ 103 from authentik.policies.models import PolicyBinding 104 105 # Check if any policy is in use through bindings 106 if PolicyBinding.in_use.for_policy(cls).exists(): 107 return True 108 109 # Check if any policy is attached to a PromptStage 110 if cls.objects.filter(promptstage__isnull=False).exists(): 111 return True 112 113 return False
Check if any UniquePasswordPolicy is in use, either through policy bindings or direct attachment to a PromptStage.
Returns: bool: True if any policy is in use, False otherwise
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.policies.models.Policy
- policy_uuid
- name
- execution_logging
- objects
- PolicyMeta
- created
- last_updated
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- bindings
- dummypolicy
- eventmatcherpolicy
- passwordexpirypolicy
- reputationpolicy
- expressionpolicy
- geoippolicy
- promptstage_set
- passwordpolicy
- uniquepasswordpolicy
The requested object does not exist
The query returned multiple objects when only one was expected.
120class UserPasswordHistory(models.Model): 121 user = models.ForeignKey(User, on_delete=models.CASCADE, related_name="old_passwords") 122 # Mimic's column type of AbstractBaseUser.password 123 old_password = models.CharField(max_length=128) 124 created_at = models.DateTimeField(auto_now_add=True) 125 126 hibp_prefix_sha1 = models.CharField(max_length=5) 127 hibp_pw_hash = models.TextField() 128 129 class Meta: 130 verbose_name = _("User Password History") 131 132 def __str__(self) -> str: 133 timestamp = f"{self.created_at:%Y/%m/%d %X}" if self.created_at else "N/A" 134 return f"Previous Password (user: {self.user_id}, recorded: {timestamp})" 135 136 @classmethod 137 def create_for_user(cls, user: User, password: str): 138 # To check users' passwords against Have I been Pwned, we need the first 5 chars 139 # of the password hashed with SHA1 without a salt... 140 pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest() # nosec 141 # ...however that'll give us a list of hashes from HIBP, and to compare that we still 142 # need a full unsalted SHA1 of the password. We don't want to save that directly in 143 # the database, so we hash that SHA1 again with a modern hashing alg, 144 # and then when we check users' passwords against HIBP we can use `check_password` 145 # which will take care of this. 146 hibp_hash_hash = make_password(pw_hash_sha1) 147 return cls.objects.create( 148 user=user, 149 old_password=password, 150 hibp_prefix_sha1=pw_hash_sha1[:5], 151 hibp_pw_hash=hibp_hash_hash, 152 )
UserPasswordHistory(id, user, old_password, created_at, hibp_prefix_sha1, hibp_pw_hash)
Accessor to the related object on the forward side of a many-to-one or one-to-one (via ForwardOneToOneDescriptor subclass) relation.
In the example::
class Child(Model):
parent = ForeignKey(Parent, related_name='children')
Child.parent is a ForwardManyToOneDescriptor instance.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
136 @classmethod 137 def create_for_user(cls, user: User, password: str): 138 # To check users' passwords against Have I been Pwned, we need the first 5 chars 139 # of the password hashed with SHA1 without a salt... 140 pw_hash_sha1 = sha1(password.encode("utf-8")).hexdigest() # nosec 141 # ...however that'll give us a list of hashes from HIBP, and to compare that we still 142 # need a full unsalted SHA1 of the password. We don't want to save that directly in 143 # the database, so we hash that SHA1 again with a modern hashing alg, 144 # and then when we check users' passwords against HIBP we can use `check_password` 145 # which will take care of this. 146 hibp_hash_hash = make_password(pw_hash_sha1) 147 return cls.objects.create( 148 user=user, 149 old_password=password, 150 hibp_prefix_sha1=pw_hash_sha1[:5], 151 hibp_pw_hash=hibp_hash_hash, 152 )
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
The requested object does not exist
The query returned multiple objects when only one was expected.