authentik.policies.geoip.models

GeoIP policy

  1"""GeoIP policy"""
  2
  3from itertools import chain
  4
  5from django.contrib.postgres.fields import ArrayField
  6from django.db import models
  7from django.utils.timezone import now
  8from django.utils.translation import gettext as _
  9from django_countries.fields import CountryField
 10from geopy import distance
 11from rest_framework.serializers import BaseSerializer
 12
 13from authentik.events.context_processors.geoip import GeoIPDict
 14from authentik.events.models import Event, EventAction
 15from authentik.policies.exceptions import PolicyException
 16from authentik.policies.geoip.exceptions import GeoIPNotFoundException
 17from authentik.policies.models import Policy
 18from authentik.policies.types import PolicyRequest, PolicyResult
 19
 20MAX_DISTANCE_HOUR_KM = 1000
 21
 22
 23class GeoIPPolicy(Policy):
 24    """Ensure the user satisfies requirements of geography or network topology, based on IP
 25    address."""
 26
 27    asns = ArrayField(models.IntegerField(), blank=True, default=list)
 28    countries = CountryField(multiple=True, blank=True)
 29
 30    distance_tolerance_km = models.PositiveIntegerField(default=50)
 31
 32    check_history_distance = models.BooleanField(default=False)
 33    history_max_distance_km = models.PositiveBigIntegerField(default=100)
 34    history_login_count = models.PositiveIntegerField(default=5)
 35
 36    check_impossible_travel = models.BooleanField(default=False)
 37    impossible_tolerance_km = models.PositiveIntegerField(default=100)
 38
 39    @property
 40    def serializer(self) -> type[BaseSerializer]:
 41        from authentik.policies.geoip.api import GeoIPPolicySerializer
 42
 43        return GeoIPPolicySerializer
 44
 45    @property
 46    def component(self) -> str:  # pragma: no cover
 47        return "ak-policy-geoip-form"
 48
 49    def passes(self, request: PolicyRequest) -> PolicyResult:
 50        """
 51        Passes if any of the following is true:
 52        - the client IP is advertised by an autonomous system with ASN in the `asns`
 53        - the client IP is geolocated in a country of `countries`
 54        """
 55        static_results: list[PolicyResult] = []
 56        dynamic_results: list[PolicyResult] = []
 57
 58        if self.asns:
 59            static_results.append(self.passes_asn(request))
 60        if self.countries:
 61            static_results.append(self.passes_country(request))
 62
 63        if self.check_history_distance or self.check_impossible_travel:
 64            dynamic_results.append(self.passes_distance(request))
 65
 66        if not static_results and not dynamic_results:
 67            return PolicyResult(True)
 68
 69        static_passing = any(r.passing for r in static_results) if static_results else True
 70        dynamic_passing = all(r.passing for r in dynamic_results)
 71        passing = static_passing and dynamic_passing
 72        messages = chain(
 73            *[r.messages for r in static_results], *[r.messages for r in dynamic_results]
 74        )
 75
 76        result = PolicyResult(passing, *messages)
 77        result.source_results = list(chain(static_results, dynamic_results))
 78
 79        return result
 80
 81    def passes_asn(self, request: PolicyRequest) -> PolicyResult:
 82        # This is not a single get chain because `request.context` can contain `{ "asn": None }`.
 83        asn_data = request.context.get("asn")
 84        asn = asn_data.get("asn") if asn_data else None
 85
 86        if not asn:
 87            raise PolicyException(
 88                GeoIPNotFoundException(_("GeoIP: client IP not found in ASN database."))
 89            )
 90
 91        if asn not in self.asns:
 92            message = _("Client IP is not part of an allowed autonomous system.")
 93            return PolicyResult(False, message)
 94
 95        return PolicyResult(True)
 96
 97    def passes_country(self, request: PolicyRequest) -> PolicyResult:
 98        # This is not a single get chain because `request.context` can contain `{ "geoip": None }`.
 99        geoip_data: GeoIPDict | None = request.context.get("geoip")
100        country = geoip_data.get("country") if geoip_data else None
101
102        if not country:
103            raise PolicyException(
104                GeoIPNotFoundException(_("GeoIP: client IP address not found in City database."))
105            )
106
107        if country not in self.countries:
108            message = _("Client IP is not in an allowed country.")
109            return PolicyResult(False, message)
110
111        return PolicyResult(True)
112
113    def passes_distance(self, request: PolicyRequest) -> PolicyResult:
114        """Check if current policy execution is out of distance range compared
115        to previous authentication requests"""
116        # Get previous login event and GeoIP data
117        previous_logins = Event.objects.filter(
118            action=EventAction.LOGIN,
119            user__pk=request.user.pk,  # context__geo__isnull=False
120        ).order_by("-created")[: self.history_login_count]
121        _now = now()
122        geoip_data: GeoIPDict | None = request.context.get("geoip")
123        if not geoip_data:
124            return PolicyResult(False)
125        if not previous_logins.exists():
126            return PolicyResult(True)
127        result = False
128        for previous_login in previous_logins:
129            if "geo" not in previous_login.context:
130                continue
131            previous_login_geoip: GeoIPDict = previous_login.context["geo"]
132
133            # Figure out distance
134            dist = distance.geodesic(
135                (previous_login_geoip["lat"], previous_login_geoip["long"]),
136                (geoip_data["lat"], geoip_data["long"]),
137            )
138            if self.check_history_distance and dist.km >= (
139                self.history_max_distance_km + self.distance_tolerance_km
140            ):
141                return PolicyResult(
142                    False, _("Distance from previous authentication is larger than threshold.")
143                )
144            # Check if distance between `previous_login` and now is more
145            # than max distance per hour times the amount of hours since the previous login
146            # (round down to the lowest closest time of hours)
147            # clamped to be at least 1 hour
148            rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 3600), 1)
149            if self.check_impossible_travel and dist.km >= (
150                (MAX_DISTANCE_HOUR_KM * rel_time_hours) + self.distance_tolerance_km
151            ):
152                return PolicyResult(False, _("Distance is further than possible."))
153            result = True
154        return PolicyResult(result)
155
156    class Meta(Policy.PolicyMeta):
157        verbose_name = _("GeoIP Policy")
158        verbose_name_plural = _("GeoIP Policies")
MAX_DISTANCE_HOUR_KM = 1000
class GeoIPPolicy(authentik.policies.models.Policy):
 24class GeoIPPolicy(Policy):
 25    """Ensure the user satisfies requirements of geography or network topology, based on IP
 26    address."""
 27
 28    asns = ArrayField(models.IntegerField(), blank=True, default=list)
 29    countries = CountryField(multiple=True, blank=True)
 30
 31    distance_tolerance_km = models.PositiveIntegerField(default=50)
 32
 33    check_history_distance = models.BooleanField(default=False)
 34    history_max_distance_km = models.PositiveBigIntegerField(default=100)
 35    history_login_count = models.PositiveIntegerField(default=5)
 36
 37    check_impossible_travel = models.BooleanField(default=False)
 38    impossible_tolerance_km = models.PositiveIntegerField(default=100)
 39
 40    @property
 41    def serializer(self) -> type[BaseSerializer]:
 42        from authentik.policies.geoip.api import GeoIPPolicySerializer
 43
 44        return GeoIPPolicySerializer
 45
 46    @property
 47    def component(self) -> str:  # pragma: no cover
 48        return "ak-policy-geoip-form"
 49
 50    def passes(self, request: PolicyRequest) -> PolicyResult:
 51        """
 52        Passes if any of the following is true:
 53        - the client IP is advertised by an autonomous system with ASN in the `asns`
 54        - the client IP is geolocated in a country of `countries`
 55        """
 56        static_results: list[PolicyResult] = []
 57        dynamic_results: list[PolicyResult] = []
 58
 59        if self.asns:
 60            static_results.append(self.passes_asn(request))
 61        if self.countries:
 62            static_results.append(self.passes_country(request))
 63
 64        if self.check_history_distance or self.check_impossible_travel:
 65            dynamic_results.append(self.passes_distance(request))
 66
 67        if not static_results and not dynamic_results:
 68            return PolicyResult(True)
 69
 70        static_passing = any(r.passing for r in static_results) if static_results else True
 71        dynamic_passing = all(r.passing for r in dynamic_results)
 72        passing = static_passing and dynamic_passing
 73        messages = chain(
 74            *[r.messages for r in static_results], *[r.messages for r in dynamic_results]
 75        )
 76
 77        result = PolicyResult(passing, *messages)
 78        result.source_results = list(chain(static_results, dynamic_results))
 79
 80        return result
 81
 82    def passes_asn(self, request: PolicyRequest) -> PolicyResult:
 83        # This is not a single get chain because `request.context` can contain `{ "asn": None }`.
 84        asn_data = request.context.get("asn")
 85        asn = asn_data.get("asn") if asn_data else None
 86
 87        if not asn:
 88            raise PolicyException(
 89                GeoIPNotFoundException(_("GeoIP: client IP not found in ASN database."))
 90            )
 91
 92        if asn not in self.asns:
 93            message = _("Client IP is not part of an allowed autonomous system.")
 94            return PolicyResult(False, message)
 95
 96        return PolicyResult(True)
 97
 98    def passes_country(self, request: PolicyRequest) -> PolicyResult:
 99        # This is not a single get chain because `request.context` can contain `{ "geoip": None }`.
100        geoip_data: GeoIPDict | None = request.context.get("geoip")
101        country = geoip_data.get("country") if geoip_data else None
102
103        if not country:
104            raise PolicyException(
105                GeoIPNotFoundException(_("GeoIP: client IP address not found in City database."))
106            )
107
108        if country not in self.countries:
109            message = _("Client IP is not in an allowed country.")
110            return PolicyResult(False, message)
111
112        return PolicyResult(True)
113
114    def passes_distance(self, request: PolicyRequest) -> PolicyResult:
115        """Check if current policy execution is out of distance range compared
116        to previous authentication requests"""
117        # Get previous login event and GeoIP data
118        previous_logins = Event.objects.filter(
119            action=EventAction.LOGIN,
120            user__pk=request.user.pk,  # context__geo__isnull=False
121        ).order_by("-created")[: self.history_login_count]
122        _now = now()
123        geoip_data: GeoIPDict | None = request.context.get("geoip")
124        if not geoip_data:
125            return PolicyResult(False)
126        if not previous_logins.exists():
127            return PolicyResult(True)
128        result = False
129        for previous_login in previous_logins:
130            if "geo" not in previous_login.context:
131                continue
132            previous_login_geoip: GeoIPDict = previous_login.context["geo"]
133
134            # Figure out distance
135            dist = distance.geodesic(
136                (previous_login_geoip["lat"], previous_login_geoip["long"]),
137                (geoip_data["lat"], geoip_data["long"]),
138            )
139            if self.check_history_distance and dist.km >= (
140                self.history_max_distance_km + self.distance_tolerance_km
141            ):
142                return PolicyResult(
143                    False, _("Distance from previous authentication is larger than threshold.")
144                )
145            # Check if distance between `previous_login` and now is more
146            # than max distance per hour times the amount of hours since the previous login
147            # (round down to the lowest closest time of hours)
148            # clamped to be at least 1 hour
149            rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 3600), 1)
150            if self.check_impossible_travel and dist.km >= (
151                (MAX_DISTANCE_HOUR_KM * rel_time_hours) + self.distance_tolerance_km
152            ):
153                return PolicyResult(False, _("Distance is further than possible."))
154            result = True
155        return PolicyResult(result)
156
157    class Meta(Policy.PolicyMeta):
158        verbose_name = _("GeoIP Policy")
159        verbose_name_plural = _("GeoIP Policies")

Ensure the user satisfies requirements of geography or network topology, based on IP address.

def asns(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

countries

A descriptor for country fields on a model instance. Returns a Country when accessed so you can do things like::

>>> from people import Person
>>> person = Person.object.get(name='Chris')

>>> person.country.name
'New Zealand'

>>> person.country.flag
'/static/flags/nz.gif'
def distance_tolerance_km(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def check_history_distance(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def history_max_distance_km(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def history_login_count(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def check_impossible_travel(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

def impossible_tolerance_km(unknown):

A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.

serializer: type[rest_framework.serializers.BaseSerializer]
40    @property
41    def serializer(self) -> type[BaseSerializer]:
42        from authentik.policies.geoip.api import GeoIPPolicySerializer
43
44        return GeoIPPolicySerializer

Get serializer for this model

component: str
46    @property
47    def component(self) -> str:  # pragma: no cover
48        return "ak-policy-geoip-form"

Return component used to edit this object

50    def passes(self, request: PolicyRequest) -> PolicyResult:
51        """
52        Passes if any of the following is true:
53        - the client IP is advertised by an autonomous system with ASN in the `asns`
54        - the client IP is geolocated in a country of `countries`
55        """
56        static_results: list[PolicyResult] = []
57        dynamic_results: list[PolicyResult] = []
58
59        if self.asns:
60            static_results.append(self.passes_asn(request))
61        if self.countries:
62            static_results.append(self.passes_country(request))
63
64        if self.check_history_distance or self.check_impossible_travel:
65            dynamic_results.append(self.passes_distance(request))
66
67        if not static_results and not dynamic_results:
68            return PolicyResult(True)
69
70        static_passing = any(r.passing for r in static_results) if static_results else True
71        dynamic_passing = all(r.passing for r in dynamic_results)
72        passing = static_passing and dynamic_passing
73        messages = chain(
74            *[r.messages for r in static_results], *[r.messages for r in dynamic_results]
75        )
76
77        result = PolicyResult(passing, *messages)
78        result.source_results = list(chain(static_results, dynamic_results))
79
80        return result

Passes if any of the following is true:

  • the client IP is advertised by an autonomous system with ASN in the asns
  • the client IP is geolocated in a country of countries
def passes_asn( self, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyResult:
82    def passes_asn(self, request: PolicyRequest) -> PolicyResult:
83        # This is not a single get chain because `request.context` can contain `{ "asn": None }`.
84        asn_data = request.context.get("asn")
85        asn = asn_data.get("asn") if asn_data else None
86
87        if not asn:
88            raise PolicyException(
89                GeoIPNotFoundException(_("GeoIP: client IP not found in ASN database."))
90            )
91
92        if asn not in self.asns:
93            message = _("Client IP is not part of an allowed autonomous system.")
94            return PolicyResult(False, message)
95
96        return PolicyResult(True)
def passes_country( self, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyResult:
 98    def passes_country(self, request: PolicyRequest) -> PolicyResult:
 99        # This is not a single get chain because `request.context` can contain `{ "geoip": None }`.
100        geoip_data: GeoIPDict | None = request.context.get("geoip")
101        country = geoip_data.get("country") if geoip_data else None
102
103        if not country:
104            raise PolicyException(
105                GeoIPNotFoundException(_("GeoIP: client IP address not found in City database."))
106            )
107
108        if country not in self.countries:
109            message = _("Client IP is not in an allowed country.")
110            return PolicyResult(False, message)
111
112        return PolicyResult(True)
def passes_distance( self, request: authentik.policies.types.PolicyRequest) -> authentik.policies.types.PolicyResult:
114    def passes_distance(self, request: PolicyRequest) -> PolicyResult:
115        """Check if current policy execution is out of distance range compared
116        to previous authentication requests"""
117        # Get previous login event and GeoIP data
118        previous_logins = Event.objects.filter(
119            action=EventAction.LOGIN,
120            user__pk=request.user.pk,  # context__geo__isnull=False
121        ).order_by("-created")[: self.history_login_count]
122        _now = now()
123        geoip_data: GeoIPDict | None = request.context.get("geoip")
124        if not geoip_data:
125            return PolicyResult(False)
126        if not previous_logins.exists():
127            return PolicyResult(True)
128        result = False
129        for previous_login in previous_logins:
130            if "geo" not in previous_login.context:
131                continue
132            previous_login_geoip: GeoIPDict = previous_login.context["geo"]
133
134            # Figure out distance
135            dist = distance.geodesic(
136                (previous_login_geoip["lat"], previous_login_geoip["long"]),
137                (geoip_data["lat"], geoip_data["long"]),
138            )
139            if self.check_history_distance and dist.km >= (
140                self.history_max_distance_km + self.distance_tolerance_km
141            ):
142                return PolicyResult(
143                    False, _("Distance from previous authentication is larger than threshold.")
144                )
145            # Check if distance between `previous_login` and now is more
146            # than max distance per hour times the amount of hours since the previous login
147            # (round down to the lowest closest time of hours)
148            # clamped to be at least 1 hour
149            rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 3600), 1)
150            if self.check_impossible_travel and dist.km >= (
151                (MAX_DISTANCE_HOUR_KM * rel_time_hours) + self.distance_tolerance_km
152            ):
153                return PolicyResult(False, _("Distance is further than possible."))
154            result = True
155        return PolicyResult(result)

Check if current policy execution is out of distance range compared to previous authentication requests

def get_countries_display(unknown):

Method descriptor with partial application of the given arguments and keywords.

Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.

policy_ptr_id
policy_ptr

Accessor to the related object on the forward side of a one-to-one relation.

In the example::

class Restaurant(Model):
    place = OneToOneField(Place, related_name='restaurant')

Restaurant.place is a ForwardOneToOneDescriptor instance.

class GeoIPPolicy.DoesNotExist(authentik.policies.models.Policy.DoesNotExist):

The requested object does not exist

class GeoIPPolicy.MultipleObjectsReturned(authentik.policies.models.Policy.MultipleObjectsReturned):

The query returned multiple objects when only one was expected.