authentik.policies.geoip.models
GeoIP policy
1"""GeoIP policy""" 2 3from itertools import chain 4 5from django.contrib.postgres.fields import ArrayField 6from django.db import models 7from django.utils.timezone import now 8from django.utils.translation import gettext as _ 9from django_countries.fields import CountryField 10from geopy import distance 11from rest_framework.serializers import BaseSerializer 12 13from authentik.events.context_processors.geoip import GeoIPDict 14from authentik.events.models import Event, EventAction 15from authentik.policies.exceptions import PolicyException 16from authentik.policies.geoip.exceptions import GeoIPNotFoundException 17from authentik.policies.models import Policy 18from authentik.policies.types import PolicyRequest, PolicyResult 19 20MAX_DISTANCE_HOUR_KM = 1000 21 22 23class GeoIPPolicy(Policy): 24 """Ensure the user satisfies requirements of geography or network topology, based on IP 25 address.""" 26 27 asns = ArrayField(models.IntegerField(), blank=True, default=list) 28 countries = CountryField(multiple=True, blank=True) 29 30 distance_tolerance_km = models.PositiveIntegerField(default=50) 31 32 check_history_distance = models.BooleanField(default=False) 33 history_max_distance_km = models.PositiveBigIntegerField(default=100) 34 history_login_count = models.PositiveIntegerField(default=5) 35 36 check_impossible_travel = models.BooleanField(default=False) 37 impossible_tolerance_km = models.PositiveIntegerField(default=100) 38 39 @property 40 def serializer(self) -> type[BaseSerializer]: 41 from authentik.policies.geoip.api import GeoIPPolicySerializer 42 43 return GeoIPPolicySerializer 44 45 @property 46 def component(self) -> str: # pragma: no cover 47 return "ak-policy-geoip-form" 48 49 def passes(self, request: PolicyRequest) -> PolicyResult: 50 """ 51 Passes if any of the following is true: 52 - the client IP is advertised by an autonomous system with ASN in the `asns` 53 - the client IP is geolocated in a country of `countries` 54 """ 55 static_results: list[PolicyResult] = [] 56 dynamic_results: list[PolicyResult] = [] 57 58 if self.asns: 59 static_results.append(self.passes_asn(request)) 60 if self.countries: 61 static_results.append(self.passes_country(request)) 62 63 if self.check_history_distance or self.check_impossible_travel: 64 dynamic_results.append(self.passes_distance(request)) 65 66 if not static_results and not dynamic_results: 67 return PolicyResult(True) 68 69 static_passing = any(r.passing for r in static_results) if static_results else True 70 dynamic_passing = all(r.passing for r in dynamic_results) 71 passing = static_passing and dynamic_passing 72 messages = chain( 73 *[r.messages for r in static_results], *[r.messages for r in dynamic_results] 74 ) 75 76 result = PolicyResult(passing, *messages) 77 result.source_results = list(chain(static_results, dynamic_results)) 78 79 return result 80 81 def passes_asn(self, request: PolicyRequest) -> PolicyResult: 82 # This is not a single get chain because `request.context` can contain `{ "asn": None }`. 83 asn_data = request.context.get("asn") 84 asn = asn_data.get("asn") if asn_data else None 85 86 if not asn: 87 raise PolicyException( 88 GeoIPNotFoundException(_("GeoIP: client IP not found in ASN database.")) 89 ) 90 91 if asn not in self.asns: 92 message = _("Client IP is not part of an allowed autonomous system.") 93 return PolicyResult(False, message) 94 95 return PolicyResult(True) 96 97 def passes_country(self, request: PolicyRequest) -> PolicyResult: 98 # This is not a single get chain because `request.context` can contain `{ "geoip": None }`. 99 geoip_data: GeoIPDict | None = request.context.get("geoip") 100 country = geoip_data.get("country") if geoip_data else None 101 102 if not country: 103 raise PolicyException( 104 GeoIPNotFoundException(_("GeoIP: client IP address not found in City database.")) 105 ) 106 107 if country not in self.countries: 108 message = _("Client IP is not in an allowed country.") 109 return PolicyResult(False, message) 110 111 return PolicyResult(True) 112 113 def passes_distance(self, request: PolicyRequest) -> PolicyResult: 114 """Check if current policy execution is out of distance range compared 115 to previous authentication requests""" 116 # Get previous login event and GeoIP data 117 previous_logins = Event.objects.filter( 118 action=EventAction.LOGIN, 119 user__pk=request.user.pk, # context__geo__isnull=False 120 ).order_by("-created")[: self.history_login_count] 121 _now = now() 122 geoip_data: GeoIPDict | None = request.context.get("geoip") 123 if not geoip_data: 124 return PolicyResult(False) 125 if not previous_logins.exists(): 126 return PolicyResult(True) 127 result = False 128 for previous_login in previous_logins: 129 if "geo" not in previous_login.context: 130 continue 131 previous_login_geoip: GeoIPDict = previous_login.context["geo"] 132 133 # Figure out distance 134 dist = distance.geodesic( 135 (previous_login_geoip["lat"], previous_login_geoip["long"]), 136 (geoip_data["lat"], geoip_data["long"]), 137 ) 138 if self.check_history_distance and dist.km >= ( 139 self.history_max_distance_km + self.distance_tolerance_km 140 ): 141 return PolicyResult( 142 False, _("Distance from previous authentication is larger than threshold.") 143 ) 144 # Check if distance between `previous_login` and now is more 145 # than max distance per hour times the amount of hours since the previous login 146 # (round down to the lowest closest time of hours) 147 # clamped to be at least 1 hour 148 rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 3600), 1) 149 if self.check_impossible_travel and dist.km >= ( 150 (MAX_DISTANCE_HOUR_KM * rel_time_hours) + self.distance_tolerance_km 151 ): 152 return PolicyResult(False, _("Distance is further than possible.")) 153 result = True 154 return PolicyResult(result) 155 156 class Meta(Policy.PolicyMeta): 157 verbose_name = _("GeoIP Policy") 158 verbose_name_plural = _("GeoIP Policies")
24class GeoIPPolicy(Policy): 25 """Ensure the user satisfies requirements of geography or network topology, based on IP 26 address.""" 27 28 asns = ArrayField(models.IntegerField(), blank=True, default=list) 29 countries = CountryField(multiple=True, blank=True) 30 31 distance_tolerance_km = models.PositiveIntegerField(default=50) 32 33 check_history_distance = models.BooleanField(default=False) 34 history_max_distance_km = models.PositiveBigIntegerField(default=100) 35 history_login_count = models.PositiveIntegerField(default=5) 36 37 check_impossible_travel = models.BooleanField(default=False) 38 impossible_tolerance_km = models.PositiveIntegerField(default=100) 39 40 @property 41 def serializer(self) -> type[BaseSerializer]: 42 from authentik.policies.geoip.api import GeoIPPolicySerializer 43 44 return GeoIPPolicySerializer 45 46 @property 47 def component(self) -> str: # pragma: no cover 48 return "ak-policy-geoip-form" 49 50 def passes(self, request: PolicyRequest) -> PolicyResult: 51 """ 52 Passes if any of the following is true: 53 - the client IP is advertised by an autonomous system with ASN in the `asns` 54 - the client IP is geolocated in a country of `countries` 55 """ 56 static_results: list[PolicyResult] = [] 57 dynamic_results: list[PolicyResult] = [] 58 59 if self.asns: 60 static_results.append(self.passes_asn(request)) 61 if self.countries: 62 static_results.append(self.passes_country(request)) 63 64 if self.check_history_distance or self.check_impossible_travel: 65 dynamic_results.append(self.passes_distance(request)) 66 67 if not static_results and not dynamic_results: 68 return PolicyResult(True) 69 70 static_passing = any(r.passing for r in static_results) if static_results else True 71 dynamic_passing = all(r.passing for r in dynamic_results) 72 passing = static_passing and dynamic_passing 73 messages = chain( 74 *[r.messages for r in static_results], *[r.messages for r in dynamic_results] 75 ) 76 77 result = PolicyResult(passing, *messages) 78 result.source_results = list(chain(static_results, dynamic_results)) 79 80 return result 81 82 def passes_asn(self, request: PolicyRequest) -> PolicyResult: 83 # This is not a single get chain because `request.context` can contain `{ "asn": None }`. 84 asn_data = request.context.get("asn") 85 asn = asn_data.get("asn") if asn_data else None 86 87 if not asn: 88 raise PolicyException( 89 GeoIPNotFoundException(_("GeoIP: client IP not found in ASN database.")) 90 ) 91 92 if asn not in self.asns: 93 message = _("Client IP is not part of an allowed autonomous system.") 94 return PolicyResult(False, message) 95 96 return PolicyResult(True) 97 98 def passes_country(self, request: PolicyRequest) -> PolicyResult: 99 # This is not a single get chain because `request.context` can contain `{ "geoip": None }`. 100 geoip_data: GeoIPDict | None = request.context.get("geoip") 101 country = geoip_data.get("country") if geoip_data else None 102 103 if not country: 104 raise PolicyException( 105 GeoIPNotFoundException(_("GeoIP: client IP address not found in City database.")) 106 ) 107 108 if country not in self.countries: 109 message = _("Client IP is not in an allowed country.") 110 return PolicyResult(False, message) 111 112 return PolicyResult(True) 113 114 def passes_distance(self, request: PolicyRequest) -> PolicyResult: 115 """Check if current policy execution is out of distance range compared 116 to previous authentication requests""" 117 # Get previous login event and GeoIP data 118 previous_logins = Event.objects.filter( 119 action=EventAction.LOGIN, 120 user__pk=request.user.pk, # context__geo__isnull=False 121 ).order_by("-created")[: self.history_login_count] 122 _now = now() 123 geoip_data: GeoIPDict | None = request.context.get("geoip") 124 if not geoip_data: 125 return PolicyResult(False) 126 if not previous_logins.exists(): 127 return PolicyResult(True) 128 result = False 129 for previous_login in previous_logins: 130 if "geo" not in previous_login.context: 131 continue 132 previous_login_geoip: GeoIPDict = previous_login.context["geo"] 133 134 # Figure out distance 135 dist = distance.geodesic( 136 (previous_login_geoip["lat"], previous_login_geoip["long"]), 137 (geoip_data["lat"], geoip_data["long"]), 138 ) 139 if self.check_history_distance and dist.km >= ( 140 self.history_max_distance_km + self.distance_tolerance_km 141 ): 142 return PolicyResult( 143 False, _("Distance from previous authentication is larger than threshold.") 144 ) 145 # Check if distance between `previous_login` and now is more 146 # than max distance per hour times the amount of hours since the previous login 147 # (round down to the lowest closest time of hours) 148 # clamped to be at least 1 hour 149 rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 3600), 1) 150 if self.check_impossible_travel and dist.km >= ( 151 (MAX_DISTANCE_HOUR_KM * rel_time_hours) + self.distance_tolerance_km 152 ): 153 return PolicyResult(False, _("Distance is further than possible.")) 154 result = True 155 return PolicyResult(result) 156 157 class Meta(Policy.PolicyMeta): 158 verbose_name = _("GeoIP Policy") 159 verbose_name_plural = _("GeoIP Policies")
Ensure the user satisfies requirements of geography or network topology, based on IP address.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A descriptor for country fields on a model instance. Returns a Country when accessed so you can do things like::
>>> from people import Person
>>> person = Person.object.get(name='Chris')
>>> person.country.name
'New Zealand'
>>> person.country.flag
'/static/flags/nz.gif'
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
A wrapper for a deferred-loading field. When the value is read from this object the first time, the query is executed.
40 @property 41 def serializer(self) -> type[BaseSerializer]: 42 from authentik.policies.geoip.api import GeoIPPolicySerializer 43 44 return GeoIPPolicySerializer
Get serializer for this model
50 def passes(self, request: PolicyRequest) -> PolicyResult: 51 """ 52 Passes if any of the following is true: 53 - the client IP is advertised by an autonomous system with ASN in the `asns` 54 - the client IP is geolocated in a country of `countries` 55 """ 56 static_results: list[PolicyResult] = [] 57 dynamic_results: list[PolicyResult] = [] 58 59 if self.asns: 60 static_results.append(self.passes_asn(request)) 61 if self.countries: 62 static_results.append(self.passes_country(request)) 63 64 if self.check_history_distance or self.check_impossible_travel: 65 dynamic_results.append(self.passes_distance(request)) 66 67 if not static_results and not dynamic_results: 68 return PolicyResult(True) 69 70 static_passing = any(r.passing for r in static_results) if static_results else True 71 dynamic_passing = all(r.passing for r in dynamic_results) 72 passing = static_passing and dynamic_passing 73 messages = chain( 74 *[r.messages for r in static_results], *[r.messages for r in dynamic_results] 75 ) 76 77 result = PolicyResult(passing, *messages) 78 result.source_results = list(chain(static_results, dynamic_results)) 79 80 return result
82 def passes_asn(self, request: PolicyRequest) -> PolicyResult: 83 # This is not a single get chain because `request.context` can contain `{ "asn": None }`. 84 asn_data = request.context.get("asn") 85 asn = asn_data.get("asn") if asn_data else None 86 87 if not asn: 88 raise PolicyException( 89 GeoIPNotFoundException(_("GeoIP: client IP not found in ASN database.")) 90 ) 91 92 if asn not in self.asns: 93 message = _("Client IP is not part of an allowed autonomous system.") 94 return PolicyResult(False, message) 95 96 return PolicyResult(True)
98 def passes_country(self, request: PolicyRequest) -> PolicyResult: 99 # This is not a single get chain because `request.context` can contain `{ "geoip": None }`. 100 geoip_data: GeoIPDict | None = request.context.get("geoip") 101 country = geoip_data.get("country") if geoip_data else None 102 103 if not country: 104 raise PolicyException( 105 GeoIPNotFoundException(_("GeoIP: client IP address not found in City database.")) 106 ) 107 108 if country not in self.countries: 109 message = _("Client IP is not in an allowed country.") 110 return PolicyResult(False, message) 111 112 return PolicyResult(True)
114 def passes_distance(self, request: PolicyRequest) -> PolicyResult: 115 """Check if current policy execution is out of distance range compared 116 to previous authentication requests""" 117 # Get previous login event and GeoIP data 118 previous_logins = Event.objects.filter( 119 action=EventAction.LOGIN, 120 user__pk=request.user.pk, # context__geo__isnull=False 121 ).order_by("-created")[: self.history_login_count] 122 _now = now() 123 geoip_data: GeoIPDict | None = request.context.get("geoip") 124 if not geoip_data: 125 return PolicyResult(False) 126 if not previous_logins.exists(): 127 return PolicyResult(True) 128 result = False 129 for previous_login in previous_logins: 130 if "geo" not in previous_login.context: 131 continue 132 previous_login_geoip: GeoIPDict = previous_login.context["geo"] 133 134 # Figure out distance 135 dist = distance.geodesic( 136 (previous_login_geoip["lat"], previous_login_geoip["long"]), 137 (geoip_data["lat"], geoip_data["long"]), 138 ) 139 if self.check_history_distance and dist.km >= ( 140 self.history_max_distance_km + self.distance_tolerance_km 141 ): 142 return PolicyResult( 143 False, _("Distance from previous authentication is larger than threshold.") 144 ) 145 # Check if distance between `previous_login` and now is more 146 # than max distance per hour times the amount of hours since the previous login 147 # (round down to the lowest closest time of hours) 148 # clamped to be at least 1 hour 149 rel_time_hours = max(int((_now - previous_login.created).total_seconds() / 3600), 1) 150 if self.check_impossible_travel and dist.km >= ( 151 (MAX_DISTANCE_HOUR_KM * rel_time_hours) + self.distance_tolerance_km 152 ): 153 return PolicyResult(False, _("Distance is further than possible.")) 154 result = True 155 return PolicyResult(result)
Check if current policy execution is out of distance range compared to previous authentication requests
Method descriptor with partial application of the given arguments and keywords.
Supports wrapping existing descriptors and handles non-descriptor callables as instance methods.
Accessor to the related object on the forward side of a one-to-one relation.
In the example::
class Restaurant(Model):
place = OneToOneField(Place, related_name='restaurant')
Restaurant.place is a ForwardOneToOneDescriptor instance.
Inherited Members
- authentik.policies.models.Policy
- policy_uuid
- name
- execution_logging
- objects
- PolicyMeta
- created
- last_updated
- get_next_by_created
- get_previous_by_created
- get_next_by_last_updated
- get_previous_by_last_updated
- bindings
- dummypolicy
- eventmatcherpolicy
- passwordexpirypolicy
- reputationpolicy
- expressionpolicy
- geoippolicy
- promptstage_set
- passwordpolicy
- uniquepasswordpolicy
The requested object does not exist
The query returned multiple objects when only one was expected.