authentik.policies.password.models
password policy
1"""password policy""" 2 3import re 4from hashlib import sha1 5 6from django.db import models 7from django.utils.translation import gettext as _ 8from rest_framework.serializers import BaseSerializer 9from structlog.stdlib import get_logger 10from zxcvbn import zxcvbn 11 12from authentik.lib.utils.http import get_http_session 13from authentik.policies.models import Policy 14from authentik.policies.types import PolicyRequest, PolicyResult 15from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT 16 17LOGGER = get_logger() 18RE_LOWER = re.compile("[a-z]") 19RE_UPPER = re.compile("[A-Z]") 20RE_DIGITS = re.compile("[0-9]") 21 22 23class PasswordPolicy(Policy): 24 """Policy to make sure passwords have certain properties""" 25 26 password_field = models.TextField( 27 default="password", 28 help_text=_("Field key to check, field keys defined in Prompt stages are available."), 29 ) 30 31 check_static_rules = models.BooleanField(default=True) 32 check_have_i_been_pwned = models.BooleanField(default=False) 33 check_zxcvbn = models.BooleanField(default=False) 34 35 amount_digits = models.PositiveIntegerField(default=0) 36 amount_uppercase = models.PositiveIntegerField(default=0) 37 amount_lowercase = models.PositiveIntegerField(default=0) 38 amount_symbols = models.PositiveIntegerField(default=0) 39 length_min = models.PositiveIntegerField(default=0) 40 symbol_charset = models.TextField(default=r"!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ ") 41 error_message = models.TextField(blank=True) 42 43 hibp_allowed_count = models.PositiveIntegerField( 44 default=0, 45 help_text=_("How many times the password hash is allowed to be on haveibeenpwned"), 46 ) 47 48 zxcvbn_score_threshold = models.PositiveIntegerField( 49 default=2, 50 help_text=_("If the zxcvbn score is equal or less than this value, the policy will fail."), 51 ) 52 53 @property 54 def serializer(self) -> type[BaseSerializer]: 55 from authentik.policies.password.api import PasswordPolicySerializer 56 57 return PasswordPolicySerializer 58 59 @property 60 def component(self) -> str: 61 return "ak-policy-password-form" 62 63 def passes(self, request: PolicyRequest) -> PolicyResult: 64 password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get( 65 self.password_field, request.context.get(self.password_field) 66 ) 67 if not password: 68 LOGGER.warning( 69 "Password field not set in Policy Request", 70 field=self.password_field, 71 fields=request.context.keys(), 72 ) 73 return PolicyResult(False, _("Password not set in context")) 74 password = str(password) 75 76 if self.check_static_rules: 77 static_result = self.passes_static(password, request) 78 if not static_result.passing: 79 return static_result 80 if self.check_have_i_been_pwned: 81 hibp_result = self.passes_hibp(password, request) 82 if not hibp_result.passing: 83 return hibp_result 84 if self.check_zxcvbn: 85 zxcvbn_result = self.passes_zxcvbn(password, request) 86 if not zxcvbn_result.passing: 87 return zxcvbn_result 88 return PolicyResult(True) 89 90 def passes_static(self, password: str, request: PolicyRequest) -> PolicyResult: 91 """Check static rules""" 92 error_message = self.error_message 93 if error_message == "": 94 error_message = _("Invalid password.") 95 96 if len(password) < self.length_min: 97 LOGGER.debug("password failed", check="static", reason="length") 98 return PolicyResult(False, self.error_message) 99 100 if self.amount_digits > 0 and len(RE_DIGITS.findall(password)) < self.amount_digits: 101 LOGGER.debug("password failed", check="static", reason="amount_digits") 102 return PolicyResult(False, self.error_message) 103 if self.amount_lowercase > 0 and len(RE_LOWER.findall(password)) < self.amount_lowercase: 104 LOGGER.debug("password failed", check="static", reason="amount_lowercase") 105 return PolicyResult(False, self.error_message) 106 if self.amount_uppercase > 0 and len(RE_UPPER.findall(password)) < self.amount_uppercase: 107 LOGGER.debug("password failed", check="static", reason="amount_uppercase") 108 return PolicyResult(False, self.error_message) 109 if self.amount_symbols > 0: 110 count = 0 111 for symbol in self.symbol_charset: 112 count += password.count(symbol) 113 if count < self.amount_symbols: 114 LOGGER.debug("password failed", check="static", reason="amount_symbols") 115 return PolicyResult(False, self.error_message) 116 117 return PolicyResult(True) 118 119 def check_hibp(self, short_hash: str) -> str: 120 """Check the haveibeenpwned API""" 121 url = f"https://api.pwnedpasswords.com/range/{short_hash}" 122 return get_http_session().get(url).text 123 124 def passes_hibp(self, password: str, request: PolicyRequest) -> PolicyResult: 125 """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5 126 characters of Password in request and checks if full hash is in response. Returns 0 127 if Password is not in result otherwise the count of how many times it was used.""" 128 pw_hash = sha1(password.encode("utf-8")).hexdigest() # nosec 129 result = self.check_hibp(pw_hash[:5]) 130 final_count = 0 131 for line in result.split("\r\n"): 132 full_hash, count = line.split(":") 133 if pw_hash[5:] == full_hash.lower(): 134 final_count = int(count) 135 LOGGER.debug("got hibp result", count=final_count, hash=pw_hash[:5]) 136 if final_count > self.hibp_allowed_count: 137 LOGGER.debug("password failed", check="hibp", count=final_count) 138 message = _("Password exists on {count} online lists.".format(count=final_count)) 139 return PolicyResult(False, message) 140 return PolicyResult(True) 141 142 def passes_zxcvbn(self, password: str, request: PolicyRequest) -> PolicyResult: 143 """Check Dropbox's zxcvbn password estimator""" 144 user_inputs = [] 145 if request.user.is_authenticated: 146 user_inputs.append(request.user.username) 147 user_inputs.append(request.user.name) 148 user_inputs.append(request.user.email) 149 if request.http_request: 150 user_inputs.append(request.http_request.brand.branding_title) 151 # Only calculate result for the first 72 characters, as with over 100 char 152 # long passwords we can be reasonably sure that they'll surpass the score anyways 153 # See https://github.com/dropbox/zxcvbn#runtime-latency 154 results = zxcvbn(password[:72], user_inputs) 155 LOGGER.debug("password failed", check="zxcvbn", score=results["score"]) 156 result = PolicyResult(results["score"] > self.zxcvbn_score_threshold) 157 if not result.passing: 158 result.messages += tuple((_("Password is too weak."),)) 159 if isinstance(results["feedback"]["warning"], list): 160 result.messages += tuple(results["feedback"]["warning"]) 161 if isinstance(results["feedback"]["suggestions"], list): 162 result.messages += tuple(results["feedback"]["suggestions"]) 163 return result 164 165 class Meta(Policy.PolicyMeta): 166 verbose_name = _("Password Policy") 167 verbose_name_plural = _("Password Policies")
24class PasswordPolicy(Policy): 25 """Policy to make sure passwords have certain properties""" 26 27 password_field = models.TextField( 28 default="password", 29 help_text=_("Field key to check, field keys defined in Prompt stages are available."), 30 ) 31 32 check_static_rules = models.BooleanField(default=True) 33 check_have_i_been_pwned = models.BooleanField(default=False) 34 check_zxcvbn = models.BooleanField(default=False) 35 36 amount_digits = models.PositiveIntegerField(default=0) 37 amount_uppercase = models.PositiveIntegerField(default=0) 38 amount_lowercase = models.PositiveIntegerField(default=0) 39 amount_symbols = models.PositiveIntegerField(default=0) 40 length_min = models.PositiveIntegerField(default=0) 41 symbol_charset = models.TextField(default=r"!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ ") 42 error_message = models.TextField(blank=True) 43 44 hibp_allowed_count = models.PositiveIntegerField( 45 default=0, 46 help_text=_("How many times the password hash is allowed to be on haveibeenpwned"), 47 ) 48 49 zxcvbn_score_threshold = models.PositiveIntegerField( 50 default=2, 51 help_text=_("If the zxcvbn score is equal or less than this value, the policy will fail."), 52 ) 53 54 @property 55 def serializer(self) -> type[BaseSerializer]: 56 from authentik.policies.password.api import PasswordPolicySerializer 57 58 return PasswordPolicySerializer 59 60 @property 61 def component(self) -> str: 62 return "ak-policy-password-form" 63 64 def passes(self, request: PolicyRequest) -> PolicyResult: 65 password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get( 66 self.password_field, request.context.get(self.password_field) 67 ) 68 if not password: 69 LOGGER.warning( 70 "Password field not set in Policy Request", 71 field=self.password_field, 72 fields=request.context.keys(), 73 ) 74 return PolicyResult(False, _("Password not set in context")) 75 password = str(password) 76 77 if self.check_static_rules: 78 static_result = self.passes_static(password, request) 79 if not static_result.passing: 80 return static_result 81 if self.check_have_i_been_pwned: 82 hibp_result = self.passes_hibp(password, request) 83 if not hibp_result.passing: 84 return hibp_result 85 if self.check_zxcvbn: 86 zxcvbn_result = self.passes_zxcvbn(password, request) 87 if not zxcvbn_result.passing: 88 return zxcvbn_result 89 return PolicyResult(True) 90 91 def passes_static(self, password: str, request: PolicyRequest) -> PolicyResult: 92 """Check static rules""" 93 error_message = self.error_message 94 if error_message == "": 95 error_message = _("Invalid password.") 96 97 if len(password) < self.length_min: 98 LOGGER.debug("password failed", check="static", reason="length") 99 return PolicyResult(False, self.error_message) 100 101 if self.amount_digits > 0 and len(RE_DIGITS.findall(password)) < self.amount_digits: 102 LOGGER.debug("password failed", check="static", reason="amount_digits") 103 return PolicyResult(False, self.error_message) 104 if self.amount_lowercase > 0 and len(RE_LOWER.findall(password)) < self.amount_lowercase: 105 LOGGER.debug("password failed", check="static", reason="amount_lowercase") 106 return PolicyResult(False, self.error_message) 107 if self.amount_uppercase > 0 and len(RE_UPPER.findall(password)) < self.amount_uppercase: 108 LOGGER.debug("password failed", check="static", reason="amount_uppercase") 109 return PolicyResult(False, self.error_message) 110 if self.amount_symbols > 0: 111 count = 0 112 for symbol in self.symbol_charset: 113 count += password.count(symbol) 114 if count < self.amount_symbols: 115 LOGGER.debug("password failed", check="static", reason="amount_symbols") 116 return PolicyResult(False, self.error_message) 117 118 return PolicyResult(True) 119 120 def check_hibp(self, short_hash: str) -> str: 121 """Check the haveibeenpwned API""" 122 url = f"https://api.pwnedpasswords.com/range/{short_hash}" 123 return get_http_session().get(url).text 124 125 def passes_hibp(self, password: str, request: PolicyRequest) -> PolicyResult: 126 """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5 127 characters of Password in request and checks if full hash is in response. Returns 0 128 if Password is not in result otherwise the count of how many times it was used.""" 129 pw_hash = sha1(password.encode("utf-8")).hexdigest() # nosec 130 result = self.check_hibp(pw_hash[:5]) 131 final_count = 0 132 for line in result.split("\r\n"): 133 full_hash, count = line.split(":") 134 if pw_hash[5:] == full_hash.lower(): 135 final_count = int(count) 136 LOGGER.debug("got hibp result", count=final_count, hash=pw_hash[:5]) 137 if final_count > self.hibp_allowed_count: 138 LOGGER.debug("password failed", check="hibp", count=final_count) 139 message = _("Password exists on {count} online lists.".format(count=final_count)) 140 return PolicyResult(False, message) 141 return PolicyResult(True) 142 143 def passes_zxcvbn(self, password: str, request: PolicyRequest) -> PolicyResult: 144 """Check Dropbox's zxcvbn password estimator""" 145 user_inputs = [] 146 if request.user.is_authenticated: 147 user_inputs.append(request.user.username) 148 user_inputs.append(request.user.name) 149 user_inputs.append(request.user.email) 150 if request.http_request: 151 user_inputs.append(request.http_request.brand.branding_title) 152 # Only calculate result for the first 72 characters, as with over 100 char 153 # long passwords we can be reasonably sure that they'll surpass the score anyways 154 # See https://github.com/dropbox/zxcvbn#runtime-latency 155 results = zxcvbn(password[:72], user_inputs) 156 LOGGER.debug("password failed", check="zxcvbn", score=results["score"]) 157 result = PolicyResult(results["score"] > self.zxcvbn_score_threshold) 158 if not result.passing: 159 result.messages += tuple((_("Password is too weak."),)) 160 if isinstance(results["feedback"]["warning"], list): 161 result.messages += tuple(results["feedback"]["warning"]) 162 if isinstance(results["feedback"]["suggestions"], list): 163 result.messages += tuple(results["feedback"]["suggestions"]) 164 return result 165 166 class Meta(Policy.PolicyMeta): 167 verbose_name = _("Password Policy") 168 verbose_name_plural = _("Password Policies")
Policy to make sure passwords have certain properties
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
54 @property 55 def serializer(self) -> type[BaseSerializer]: 56 from authentik.policies.password.api import PasswordPolicySerializer 57 58 return PasswordPolicySerializer
Get serializer for this model
64 def passes(self, request: PolicyRequest) -> PolicyResult: 65 password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get( 66 self.password_field, request.context.get(self.password_field) 67 ) 68 if not password: 69 LOGGER.warning( 70 "Password field not set in Policy Request", 71 field=self.password_field, 72 fields=request.context.keys(), 73 ) 74 return PolicyResult(False, _("Password not set in context")) 75 password = str(password) 76 77 if self.check_static_rules: 78 static_result = self.passes_static(password, request) 79 if not static_result.passing: 80 return static_result 81 if self.check_have_i_been_pwned: 82 hibp_result = self.passes_hibp(password, request) 83 if not hibp_result.passing: 84 return hibp_result 85 if self.check_zxcvbn: 86 zxcvbn_result = self.passes_zxcvbn(password, request) 87 if not zxcvbn_result.passing: 88 return zxcvbn_result 89 return PolicyResult(True)
Check if request passes this policy
91 def passes_static(self, password: str, request: PolicyRequest) -> PolicyResult: 92 """Check static rules""" 93 error_message = self.error_message 94 if error_message == "": 95 error_message = _("Invalid password.") 96 97 if len(password) < self.length_min: 98 LOGGER.debug("password failed", check="static", reason="length") 99 return PolicyResult(False, self.error_message) 100 101 if self.amount_digits > 0 and len(RE_DIGITS.findall(password)) < self.amount_digits: 102 LOGGER.debug("password failed", check="static", reason="amount_digits") 103 return PolicyResult(False, self.error_message) 104 if self.amount_lowercase > 0 and len(RE_LOWER.findall(password)) < self.amount_lowercase: 105 LOGGER.debug("password failed", check="static", reason="amount_lowercase") 106 return PolicyResult(False, self.error_message) 107 if self.amount_uppercase > 0 and len(RE_UPPER.findall(password)) < self.amount_uppercase: 108 LOGGER.debug("password failed", check="static", reason="amount_uppercase") 109 return PolicyResult(False, self.error_message) 110 if self.amount_symbols > 0: 111 count = 0 112 for symbol in self.symbol_charset: 113 count += password.count(symbol) 114 if count < self.amount_symbols: 115 LOGGER.debug("password failed", check="static", reason="amount_symbols") 116 return PolicyResult(False, self.error_message) 117 118 return PolicyResult(True)
Check static rules
120 def check_hibp(self, short_hash: str) -> str: 121 """Check the haveibeenpwned API""" 122 url = f"https://api.pwnedpasswords.com/range/{short_hash}" 123 return get_http_session().get(url).text
Check the haveibeenpwned API
125 def passes_hibp(self, password: str, request: PolicyRequest) -> PolicyResult: 126 """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5 127 characters of Password in request and checks if full hash is in response. Returns 0 128 if Password is not in result otherwise the count of how many times it was used.""" 129 pw_hash = sha1(password.encode("utf-8")).hexdigest() # nosec 130 result = self.check_hibp(pw_hash[:5]) 131 final_count = 0 132 for line in result.split("\r\n"): 133 full_hash, count = line.split(":") 134 if pw_hash[5:] == full_hash.lower(): 135 final_count = int(count) 136 LOGGER.debug("got hibp result", count=final_count, hash=pw_hash[:5]) 137 if final_count > self.hibp_allowed_count: 138 LOGGER.debug("password failed", check="hibp", count=final_count) 139 message = _("Password exists on {count} online lists.".format(count=final_count)) 140 return PolicyResult(False, message) 141 return PolicyResult(True)
Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5 characters of Password in request and checks if full hash is in response. Returns 0 if Password is not in result otherwise the count of how many times it was used.
143 def passes_zxcvbn(self, password: str, request: PolicyRequest) -> PolicyResult: 144 """Check Dropbox's zxcvbn password estimator""" 145 user_inputs = [] 146 if request.user.is_authenticated: 147 user_inputs.append(request.user.username) 148 user_inputs.append(request.user.name) 149 user_inputs.append(request.user.email) 150 if request.http_request: 151 user_inputs.append(request.http_request.brand.branding_title) 152 # Only calculate result for the first 72 characters, as with over 100 char 153 # long passwords we can be reasonably sure that they'll surpass the score anyways 154 # See https://github.com/dropbox/zxcvbn#runtime-latency 155 results = zxcvbn(password[:72], user_inputs) 156 LOGGER.debug("password failed", check="zxcvbn", score=results["score"]) 157 result = PolicyResult(results["score"] > self.zxcvbn_score_threshold) 158 if not result.passing: 159 result.messages += tuple((_("Password is too weak."),)) 160 if isinstance(results["feedback"]["warning"], list): 161 result.messages += tuple(results["feedback"]["warning"]) 162 if isinstance(results["feedback"]["suggestions"], list): 163 result.messages += tuple(results["feedback"]["suggestions"]) 164 return result
Check Dropbox's zxcvbn password estimator
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.policies.models.Policy
- policy_uuid
- name
- execution_logging
- objects
- PolicyMeta
- created
- last_updated
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- bindings
- dummypolicy
- eventmatcherpolicy
- passwordexpirypolicy
- reputationpolicy
- expressionpolicy
- geoippolicy
- promptstage_set
- passwordpolicy
- uniquepasswordpolicy
The requested object does not exist
The query returned multiple objects when only one was expected.