authentik.events.context_processors.geoip
events GeoIP Reader
1"""events GeoIP Reader""" 2 3from typing import TYPE_CHECKING, TypedDict 4 5from django.http import HttpRequest 6from geoip2.errors import GeoIP2Error 7from geoip2.models import City 8from sentry_sdk import start_span 9 10from authentik.events.context_processors.mmdb import MMDBContextProcessor 11from authentik.lib.config import CONFIG 12from authentik.root.middleware import ClientIPMiddleware 13 14if TYPE_CHECKING: 15 from authentik.api.v3.config import Capabilities 16 from authentik.events.models import Event 17 18 19class GeoIPDict(TypedDict): 20 """GeoIP Details""" 21 22 continent: str | None 23 country: str | None 24 lat: float | None 25 long: float | None 26 city: str 27 28 29class GeoIPContextProcessor(MMDBContextProcessor): 30 """Slim wrapper around GeoIP API""" 31 32 def capability(self) -> Capabilities | None: 33 from authentik.api.v3.config import Capabilities 34 35 return Capabilities.CAN_GEO_IP 36 37 def path(self) -> str | None: 38 return CONFIG.get("events.context_processors.geoip") 39 40 def enrich_event(self, event: Event): 41 city = self.city_dict(event.client_ip) 42 if not city: 43 return 44 event.context["geo"] = city 45 46 def enrich_context(self, request: HttpRequest) -> dict: 47 # Different key `geoip` vs `geo` for legacy reasons 48 return {"geoip": self.city_dict(ClientIPMiddleware.get_client_ip(request))} 49 50 def city(self, ip_address: str) -> City | None: 51 """Wrapper for Reader.city""" 52 with start_span( 53 op="authentik.events.geo.city", 54 name=ip_address, 55 ): 56 if not self.configured(): 57 return None 58 self.check_expired() 59 try: 60 return self.reader.city(ip_address) 61 except GeoIP2Error, ValueError: 62 return None 63 64 def city_to_dict(self, city: City | None) -> GeoIPDict | dict: 65 """Convert City to dict""" 66 if not city: 67 return {} 68 city_dict: GeoIPDict = { 69 "continent": city.continent.code, 70 "country": city.country.iso_code, 71 "lat": city.location.latitude, 72 "long": city.location.longitude, 73 "city": "", 74 } 75 if city.city.name: 76 city_dict["city"] = city.city.name 77 return city_dict 78 79 def city_dict(self, ip_address: str) -> GeoIPDict | None: 80 """Wrapper for self.city that returns a dict""" 81 city = self.city(ip_address) 82 if not city: 83 return None 84 return self.city_to_dict(city) 85 86 87GEOIP_CONTEXT_PROCESSOR = GeoIPContextProcessor()
class
GeoIPDict(typing.TypedDict):
20class GeoIPDict(TypedDict): 21 """GeoIP Details""" 22 23 continent: str | None 24 country: str | None 25 lat: float | None 26 long: float | None 27 city: str
GeoIP Details
30class GeoIPContextProcessor(MMDBContextProcessor): 31 """Slim wrapper around GeoIP API""" 32 33 def capability(self) -> Capabilities | None: 34 from authentik.api.v3.config import Capabilities 35 36 return Capabilities.CAN_GEO_IP 37 38 def path(self) -> str | None: 39 return CONFIG.get("events.context_processors.geoip") 40 41 def enrich_event(self, event: Event): 42 city = self.city_dict(event.client_ip) 43 if not city: 44 return 45 event.context["geo"] = city 46 47 def enrich_context(self, request: HttpRequest) -> dict: 48 # Different key `geoip` vs `geo` for legacy reasons 49 return {"geoip": self.city_dict(ClientIPMiddleware.get_client_ip(request))} 50 51 def city(self, ip_address: str) -> City | None: 52 """Wrapper for Reader.city""" 53 with start_span( 54 op="authentik.events.geo.city", 55 name=ip_address, 56 ): 57 if not self.configured(): 58 return None 59 self.check_expired() 60 try: 61 return self.reader.city(ip_address) 62 except GeoIP2Error, ValueError: 63 return None 64 65 def city_to_dict(self, city: City | None) -> GeoIPDict | dict: 66 """Convert City to dict""" 67 if not city: 68 return {} 69 city_dict: GeoIPDict = { 70 "continent": city.continent.code, 71 "country": city.country.iso_code, 72 "lat": city.location.latitude, 73 "long": city.location.longitude, 74 "city": "", 75 } 76 if city.city.name: 77 city_dict["city"] = city.city.name 78 return city_dict 79 80 def city_dict(self, ip_address: str) -> GeoIPDict | None: 81 """Wrapper for self.city that returns a dict""" 82 city = self.city(ip_address) 83 if not city: 84 return None 85 return self.city_to_dict(city)
Slim wrapper around GeoIP API
def
capability(unknown):
33 def capability(self) -> Capabilities | None: 34 from authentik.api.v3.config import Capabilities 35 36 return Capabilities.CAN_GEO_IP
Return the capability this context processor provides
def
enrich_event(unknown):
41 def enrich_event(self, event: Event): 42 city = self.city_dict(event.client_ip) 43 if not city: 44 return 45 event.context["geo"] = city
Modify event
def
enrich_context(self, request: django.http.request.HttpRequest) -> dict:
47 def enrich_context(self, request: HttpRequest) -> dict: 48 # Different key `geoip` vs `geo` for legacy reasons 49 return {"geoip": self.city_dict(ClientIPMiddleware.get_client_ip(request))}
Modify context
def
city(self, ip_address: str) -> geoip2.models.City | None:
51 def city(self, ip_address: str) -> City | None: 52 """Wrapper for Reader.city""" 53 with start_span( 54 op="authentik.events.geo.city", 55 name=ip_address, 56 ): 57 if not self.configured(): 58 return None 59 self.check_expired() 60 try: 61 return self.reader.city(ip_address) 62 except GeoIP2Error, ValueError: 63 return None
Wrapper for Reader.city
65 def city_to_dict(self, city: City | None) -> GeoIPDict | dict: 66 """Convert City to dict""" 67 if not city: 68 return {} 69 city_dict: GeoIPDict = { 70 "continent": city.continent.code, 71 "country": city.country.iso_code, 72 "lat": city.location.latitude, 73 "long": city.location.longitude, 74 "city": "", 75 } 76 if city.city.name: 77 city_dict["city"] = city.city.name 78 return city_dict
Convert City to dict
GEOIP_CONTEXT_PROCESSOR =
<GeoIPContextProcessor object>