authentik.policies.password.models

password policy

  1"""password policy"""
  2
  3import re
  4from hashlib import sha1
  5
  6from django.db import models
  7from django.utils.translation import gettext as _
  8from rest_framework.serializers import BaseSerializer
  9from structlog.stdlib import get_logger
 10from zxcvbn import zxcvbn
 11
 12from authentik.lib.utils.http import get_http_session
 13from authentik.policies.models import Policy
 14from authentik.policies.types import PolicyRequest, PolicyResult
 15from authentik.stages.prompt.stage import PLAN_CONTEXT_PROMPT
 16
 17LOGGER = get_logger()
 18RE_LOWER = re.compile("[a-z]")
 19RE_UPPER = re.compile("[A-Z]")
 20RE_DIGITS = re.compile("[0-9]")
 21
 22
 23class PasswordPolicy(Policy):
 24    """Policy to make sure passwords have certain properties"""
 25
 26    password_field = models.TextField(
 27        default="password",
 28        help_text=_("Field key to check, field keys defined in Prompt stages are available."),
 29    )
 30
 31    check_static_rules = models.BooleanField(default=True)
 32    check_have_i_been_pwned = models.BooleanField(default=False)
 33    check_zxcvbn = models.BooleanField(default=False)
 34
 35    amount_digits = models.PositiveIntegerField(default=0)
 36    amount_uppercase = models.PositiveIntegerField(default=0)
 37    amount_lowercase = models.PositiveIntegerField(default=0)
 38    amount_symbols = models.PositiveIntegerField(default=0)
 39    length_min = models.PositiveIntegerField(default=0)
 40    symbol_charset = models.TextField(default=r"!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ ")
 41    error_message = models.TextField(blank=True)
 42
 43    hibp_allowed_count = models.PositiveIntegerField(
 44        default=0,
 45        help_text=_("How many times the password hash is allowed to be on haveibeenpwned"),
 46    )
 47
 48    zxcvbn_score_threshold = models.PositiveIntegerField(
 49        default=2,
 50        help_text=_("If the zxcvbn score is equal or less than this value, the policy will fail."),
 51    )
 52
 53    @property
 54    def serializer(self) -> type[BaseSerializer]:
 55        from authentik.policies.password.api import PasswordPolicySerializer
 56
 57        return PasswordPolicySerializer
 58
 59    @property
 60    def component(self) -> str:
 61        return "ak-policy-password-form"
 62
 63    def passes(self, request: PolicyRequest) -> PolicyResult:
 64        password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
 65            self.password_field, request.context.get(self.password_field)
 66        )
 67        if not password:
 68            LOGGER.warning(
 69                "Password field not set in Policy Request",
 70                field=self.password_field,
 71                fields=request.context.keys(),
 72            )
 73            return PolicyResult(False, _("Password not set in context"))
 74        password = str(password)
 75
 76        if self.check_static_rules:
 77            static_result = self.passes_static(password, request)
 78            if not static_result.passing:
 79                return static_result
 80        if self.check_have_i_been_pwned:
 81            hibp_result = self.passes_hibp(password, request)
 82            if not hibp_result.passing:
 83                return hibp_result
 84        if self.check_zxcvbn:
 85            zxcvbn_result = self.passes_zxcvbn(password, request)
 86            if not zxcvbn_result.passing:
 87                return zxcvbn_result
 88        return PolicyResult(True)
 89
 90    def passes_static(self, password: str, request: PolicyRequest) -> PolicyResult:
 91        """Check static rules"""
 92        error_message = self.error_message
 93        if error_message == "":
 94            error_message = _("Invalid password.")
 95
 96        if len(password) < self.length_min:
 97            LOGGER.debug("password failed", check="static", reason="length")
 98            return PolicyResult(False, self.error_message)
 99
100        if self.amount_digits > 0 and len(RE_DIGITS.findall(password)) < self.amount_digits:
101            LOGGER.debug("password failed", check="static", reason="amount_digits")
102            return PolicyResult(False, self.error_message)
103        if self.amount_lowercase > 0 and len(RE_LOWER.findall(password)) < self.amount_lowercase:
104            LOGGER.debug("password failed", check="static", reason="amount_lowercase")
105            return PolicyResult(False, self.error_message)
106        if self.amount_uppercase > 0 and len(RE_UPPER.findall(password)) < self.amount_uppercase:
107            LOGGER.debug("password failed", check="static", reason="amount_uppercase")
108            return PolicyResult(False, self.error_message)
109        if self.amount_symbols > 0:
110            count = 0
111            for symbol in self.symbol_charset:
112                count += password.count(symbol)
113            if count < self.amount_symbols:
114                LOGGER.debug("password failed", check="static", reason="amount_symbols")
115                return PolicyResult(False, self.error_message)
116
117        return PolicyResult(True)
118
119    def check_hibp(self, short_hash: str) -> str:
120        """Check the haveibeenpwned API"""
121        url = f"https://api.pwnedpasswords.com/range/{short_hash}"
122        return get_http_session().get(url).text
123
124    def passes_hibp(self, password: str, request: PolicyRequest) -> PolicyResult:
125        """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5
126        characters of Password in request and checks if full hash is in response. Returns 0
127        if Password is not in result otherwise the count of how many times it was used."""
128        pw_hash = sha1(password.encode("utf-8")).hexdigest()  # nosec
129        result = self.check_hibp(pw_hash[:5])
130        final_count = 0
131        for line in result.split("\r\n"):
132            full_hash, count = line.split(":")
133            if pw_hash[5:] == full_hash.lower():
134                final_count = int(count)
135        LOGGER.debug("got hibp result", count=final_count, hash=pw_hash[:5])
136        if final_count > self.hibp_allowed_count:
137            LOGGER.debug("password failed", check="hibp", count=final_count)
138            message = _("Password exists on {count} online lists.".format(count=final_count))
139            return PolicyResult(False, message)
140        return PolicyResult(True)
141
142    def passes_zxcvbn(self, password: str, request: PolicyRequest) -> PolicyResult:
143        """Check Dropbox's zxcvbn password estimator"""
144        user_inputs = []
145        if request.user.is_authenticated:
146            user_inputs.append(request.user.username)
147            user_inputs.append(request.user.name)
148            user_inputs.append(request.user.email)
149        if request.http_request:
150            user_inputs.append(request.http_request.brand.branding_title)
151        # Only calculate result for the first 72 characters, as with over 100 char
152        # long passwords we can be reasonably sure that they'll surpass the score anyways
153        # See https://github.com/dropbox/zxcvbn#runtime-latency
154        results = zxcvbn(password[:72], user_inputs)
155        LOGGER.debug("password failed", check="zxcvbn", score=results["score"])
156        result = PolicyResult(results["score"] > self.zxcvbn_score_threshold)
157        if not result.passing:
158            result.messages += tuple((_("Password is too weak."),))
159        if isinstance(results["feedback"]["warning"], list):
160            result.messages += tuple(results["feedback"]["warning"])
161        if isinstance(results["feedback"]["suggestions"], list):
162            result.messages += tuple(results["feedback"]["suggestions"])
163        return result
164
165    class Meta(Policy.PolicyMeta):
166        verbose_name = _("Password Policy")
167        verbose_name_plural = _("Password Policies")
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
RE_LOWER = re.compile('[a-z]')
RE_UPPER = re.compile('[A-Z]')
RE_DIGITS = re.compile('[0-9]')
class PasswordPolicy(authentik.policies.models.Policy):
 24class PasswordPolicy(Policy):
 25    """Policy to make sure passwords have certain properties"""
 26
 27    password_field = models.TextField(
 28        default="password",
 29        help_text=_("Field key to check, field keys defined in Prompt stages are available."),
 30    )
 31
 32    check_static_rules = models.BooleanField(default=True)
 33    check_have_i_been_pwned = models.BooleanField(default=False)
 34    check_zxcvbn = models.BooleanField(default=False)
 35
 36    amount_digits = models.PositiveIntegerField(default=0)
 37    amount_uppercase = models.PositiveIntegerField(default=0)
 38    amount_lowercase = models.PositiveIntegerField(default=0)
 39    amount_symbols = models.PositiveIntegerField(default=0)
 40    length_min = models.PositiveIntegerField(default=0)
 41    symbol_charset = models.TextField(default=r"!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~ ")
 42    error_message = models.TextField(blank=True)
 43
 44    hibp_allowed_count = models.PositiveIntegerField(
 45        default=0,
 46        help_text=_("How many times the password hash is allowed to be on haveibeenpwned"),
 47    )
 48
 49    zxcvbn_score_threshold = models.PositiveIntegerField(
 50        default=2,
 51        help_text=_("If the zxcvbn score is equal or less than this value, the policy will fail."),
 52    )
 53
 54    @property
 55    def serializer(self) -> type[BaseSerializer]:
 56        from authentik.policies.password.api import PasswordPolicySerializer
 57
 58        return PasswordPolicySerializer
 59
 60    @property
 61    def component(self) -> str:
 62        return "ak-policy-password-form"
 63
 64    def passes(self, request: PolicyRequest) -> PolicyResult:
 65        password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
 66            self.password_field, request.context.get(self.password_field)
 67        )
 68        if not password:
 69            LOGGER.warning(
 70                "Password field not set in Policy Request",
 71                field=self.password_field,
 72                fields=request.context.keys(),
 73            )
 74            return PolicyResult(False, _("Password not set in context"))
 75        password = str(password)
 76
 77        if self.check_static_rules:
 78            static_result = self.passes_static(password, request)
 79            if not static_result.passing:
 80                return static_result
 81        if self.check_have_i_been_pwned:
 82            hibp_result = self.passes_hibp(password, request)
 83            if not hibp_result.passing:
 84                return hibp_result
 85        if self.check_zxcvbn:
 86            zxcvbn_result = self.passes_zxcvbn(password, request)
 87            if not zxcvbn_result.passing:
 88                return zxcvbn_result
 89        return PolicyResult(True)
 90
 91    def passes_static(self, password: str, request: PolicyRequest) -> PolicyResult:
 92        """Check static rules"""
 93        error_message = self.error_message
 94        if error_message == "":
 95            error_message = _("Invalid password.")
 96
 97        if len(password) < self.length_min:
 98            LOGGER.debug("password failed", check="static", reason="length")
 99            return PolicyResult(False, self.error_message)
100
101        if self.amount_digits > 0 and len(RE_DIGITS.findall(password)) < self.amount_digits:
102            LOGGER.debug("password failed", check="static", reason="amount_digits")
103            return PolicyResult(False, self.error_message)
104        if self.amount_lowercase > 0 and len(RE_LOWER.findall(password)) < self.amount_lowercase:
105            LOGGER.debug("password failed", check="static", reason="amount_lowercase")
106            return PolicyResult(False, self.error_message)
107        if self.amount_uppercase > 0 and len(RE_UPPER.findall(password)) < self.amount_uppercase:
108            LOGGER.debug("password failed", check="static", reason="amount_uppercase")
109            return PolicyResult(False, self.error_message)
110        if self.amount_symbols > 0:
111            count = 0
112            for symbol in self.symbol_charset:
113                count += password.count(symbol)
114            if count < self.amount_symbols:
115                LOGGER.debug("password failed", check="static", reason="amount_symbols")
116                return PolicyResult(False, self.error_message)
117
118        return PolicyResult(True)
119
120    def check_hibp(self, short_hash: str) -> str:
121        """Check the haveibeenpwned API"""
122        url = f"https://api.pwnedpasswords.com/range/{short_hash}"
123        return get_http_session().get(url).text
124
125    def passes_hibp(self, password: str, request: PolicyRequest) -> PolicyResult:
126        """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5
127        characters of Password in request and checks if full hash is in response. Returns 0
128        if Password is not in result otherwise the count of how many times it was used."""
129        pw_hash = sha1(password.encode("utf-8")).hexdigest()  # nosec
130        result = self.check_hibp(pw_hash[:5])
131        final_count = 0
132        for line in result.split("\r\n"):
133            full_hash, count = line.split(":")
134            if pw_hash[5:] == full_hash.lower():
135                final_count = int(count)
136        LOGGER.debug("got hibp result", count=final_count, hash=pw_hash[:5])
137        if final_count > self.hibp_allowed_count:
138            LOGGER.debug("password failed", check="hibp", count=final_count)
139            message = _("Password exists on {count} online lists.".format(count=final_count))
140            return PolicyResult(False, message)
141        return PolicyResult(True)
142
143    def passes_zxcvbn(self, password: str, request: PolicyRequest) -> PolicyResult:
144        """Check Dropbox's zxcvbn password estimator"""
145        user_inputs = []
146        if request.user.is_authenticated:
147            user_inputs.append(request.user.username)
148            user_inputs.append(request.user.name)
149            user_inputs.append(request.user.email)
150        if request.http_request:
151            user_inputs.append(request.http_request.brand.branding_title)
152        # Only calculate result for the first 72 characters, as with over 100 char
153        # long passwords we can be reasonably sure that they'll surpass the score anyways
154        # See https://github.com/dropbox/zxcvbn#runtime-latency
155        results = zxcvbn(password[:72], user_inputs)
156        LOGGER.debug("password failed", check="zxcvbn", score=results["score"])
157        result = PolicyResult(results["score"] > self.zxcvbn_score_threshold)
158        if not result.passing:
159            result.messages += tuple((_("Password is too weak."),))
160        if isinstance(results["feedback"]["warning"], list):
161            result.messages += tuple(results["feedback"]["warning"])
162        if isinstance(results["feedback"]["suggestions"], list):
163            result.messages += tuple(results["feedback"]["suggestions"])
164        return result
165
166    class Meta(Policy.PolicyMeta):
167        verbose_name = _("Password Policy")
168        verbose_name_plural = _("Password Policies")

Policy to make sure passwords have certain properties

def password_field(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def check_static_rules(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def check_have_i_been_pwned(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def check_zxcvbn(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def amount_digits(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def amount_uppercase(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def amount_lowercase(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def amount_symbols(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def length_min(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def symbol_charset(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def error_message(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def hibp_allowed_count(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def zxcvbn_score_threshold(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
54    @property
55    def serializer(self) -> type[BaseSerializer]:
56        from authentik.policies.password.api import PasswordPolicySerializer
57
58        return PasswordPolicySerializer

Get serializer for this model

component: str
60    @property
61    def component(self) -> str:
62        return "ak-policy-password-form"

Return component used to edit this object

64    def passes(self, request: PolicyRequest) -> PolicyResult:
65        password = request.context.get(PLAN_CONTEXT_PROMPT, {}).get(
66            self.password_field, request.context.get(self.password_field)
67        )
68        if not password:
69            LOGGER.warning(
70                "Password field not set in Policy Request",
71                field=self.password_field,
72                fields=request.context.keys(),
73            )
74            return PolicyResult(False, _("Password not set in context"))
75        password = str(password)
76
77        if self.check_static_rules:
78            static_result = self.passes_static(password, request)
79            if not static_result.passing:
80                return static_result
81        if self.check_have_i_been_pwned:
82            hibp_result = self.passes_hibp(password, request)
83            if not hibp_result.passing:
84                return hibp_result
85        if self.check_zxcvbn:
86            zxcvbn_result = self.passes_zxcvbn(password, request)
87            if not zxcvbn_result.passing:
88                return zxcvbn_result
89        return PolicyResult(True)

Check if request passes this policy

def passes_static( self, password: str, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyResult:
 91    def passes_static(self, password: str, request: PolicyRequest) -> PolicyResult:
 92        """Check static rules"""
 93        error_message = self.error_message
 94        if error_message == "":
 95            error_message = _("Invalid password.")
 96
 97        if len(password) < self.length_min:
 98            LOGGER.debug("password failed", check="static", reason="length")
 99            return PolicyResult(False, self.error_message)
100
101        if self.amount_digits > 0 and len(RE_DIGITS.findall(password)) < self.amount_digits:
102            LOGGER.debug("password failed", check="static", reason="amount_digits")
103            return PolicyResult(False, self.error_message)
104        if self.amount_lowercase > 0 and len(RE_LOWER.findall(password)) < self.amount_lowercase:
105            LOGGER.debug("password failed", check="static", reason="amount_lowercase")
106            return PolicyResult(False, self.error_message)
107        if self.amount_uppercase > 0 and len(RE_UPPER.findall(password)) < self.amount_uppercase:
108            LOGGER.debug("password failed", check="static", reason="amount_uppercase")
109            return PolicyResult(False, self.error_message)
110        if self.amount_symbols > 0:
111            count = 0
112            for symbol in self.symbol_charset:
113                count += password.count(symbol)
114            if count < self.amount_symbols:
115                LOGGER.debug("password failed", check="static", reason="amount_symbols")
116                return PolicyResult(False, self.error_message)
117
118        return PolicyResult(True)

Check static rules

def check_hibp(self, short_hash: str) -> str:
120    def check_hibp(self, short_hash: str) -> str:
121        """Check the haveibeenpwned API"""
122        url = f"https://api.pwnedpasswords.com/range/{short_hash}"
123        return get_http_session().get(url).text

Check the haveibeenpwned API

def passes_hibp( self, password: str, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyResult:
125    def passes_hibp(self, password: str, request: PolicyRequest) -> PolicyResult:
126        """Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5
127        characters of Password in request and checks if full hash is in response. Returns 0
128        if Password is not in result otherwise the count of how many times it was used."""
129        pw_hash = sha1(password.encode("utf-8")).hexdigest()  # nosec
130        result = self.check_hibp(pw_hash[:5])
131        final_count = 0
132        for line in result.split("\r\n"):
133            full_hash, count = line.split(":")
134            if pw_hash[5:] == full_hash.lower():
135                final_count = int(count)
136        LOGGER.debug("got hibp result", count=final_count, hash=pw_hash[:5])
137        if final_count > self.hibp_allowed_count:
138            LOGGER.debug("password failed", check="hibp", count=final_count)
139            message = _("Password exists on {count} online lists.".format(count=final_count))
140            return PolicyResult(False, message)
141        return PolicyResult(True)

Check if password is in HIBP DB. Hashes given Password with SHA1, uses the first 5 characters of Password in request and checks if full hash is in response. Returns 0 if Password is not in result otherwise the count of how many times it was used.

def passes_zxcvbn( self, password: str, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyResult:
143    def passes_zxcvbn(self, password: str, request: PolicyRequest) -> PolicyResult:
144        """Check Dropbox's zxcvbn password estimator"""
145        user_inputs = []
146        if request.user.is_authenticated:
147            user_inputs.append(request.user.username)
148            user_inputs.append(request.user.name)
149            user_inputs.append(request.user.email)
150        if request.http_request:
151            user_inputs.append(request.http_request.brand.branding_title)
152        # Only calculate result for the first 72 characters, as with over 100 char
153        # long passwords we can be reasonably sure that they'll surpass the score anyways
154        # See https://github.com/dropbox/zxcvbn#runtime-latency
155        results = zxcvbn(password[:72], user_inputs)
156        LOGGER.debug("password failed", check="zxcvbn", score=results["score"])
157        result = PolicyResult(results["score"] > self.zxcvbn_score_threshold)
158        if not result.passing:
159            result.messages += tuple((_("Password is too weak."),))
160        if isinstance(results["feedback"]["warning"], list):
161            result.messages += tuple(results["feedback"]["warning"])
162        if isinstance(results["feedback"]["suggestions"], list):
163            result.messages += tuple(results["feedback"]["suggestions"])
164        return result

Check Dropbox's zxcvbn password estimator

policy_ptr_id
policy_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class PasswordPolicy.DoesNotExist(authentik.policies.models.Policy.DoesNotExist):

The requested object does not exist

class PasswordPolicy.MultipleObjectsReturned(authentik.policies.models.Policy.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.