authentik.events.context_processors.geoip

events GeoIP Reader

 1"""events GeoIP Reader"""
 2
 3from typing import TYPE_CHECKING, TypedDict
 4
 5from django.http import HttpRequest
 6from geoip2.errors import GeoIP2Error
 7from geoip2.models import City
 8from sentry_sdk import start_span
 9
10from authentik.events.context_processors.mmdb import MMDBContextProcessor
11from authentik.lib.config import CONFIG
12from authentik.root.middleware import ClientIPMiddleware
13
14if TYPE_CHECKING:
15    from authentik.api.v3.config import Capabilities
16    from authentik.events.models import Event
17
18
19class GeoIPDict(TypedDict):
20    """GeoIP Details"""
21
22    continent: str | None
23    country: str | None
24    lat: float | None
25    long: float | None
26    city: str
27
28
29class GeoIPContextProcessor(MMDBContextProcessor):
30    """Slim wrapper around GeoIP API"""
31
32    def capability(self) -> Capabilities | None:
33        from authentik.api.v3.config import Capabilities
34
35        return Capabilities.CAN_GEO_IP
36
37    def path(self) -> str | None:
38        return CONFIG.get("events.context_processors.geoip")
39
40    def enrich_event(self, event: Event):
41        city = self.city_dict(event.client_ip)
42        if not city:
43            return
44        event.context["geo"] = city
45
46    def enrich_context(self, request: HttpRequest) -> dict:
47        # Different key `geoip` vs `geo` for legacy reasons
48        return {"geoip": self.city_dict(ClientIPMiddleware.get_client_ip(request))}
49
50    def city(self, ip_address: str) -> City | None:
51        """Wrapper for Reader.city"""
52        with start_span(
53            op="authentik.events.geo.city",
54            name=ip_address,
55        ):
56            if not self.configured():
57                return None
58            self.check_expired()
59            try:
60                return self.reader.city(ip_address)
61            except GeoIP2Error, ValueError:
62                return None
63
64    def city_to_dict(self, city: City | None) -> GeoIPDict | dict:
65        """Convert City to dict"""
66        if not city:
67            return {}
68        city_dict: GeoIPDict = {
69            "continent": city.continent.code,
70            "country": city.country.iso_code,
71            "lat": city.location.latitude,
72            "long": city.location.longitude,
73            "city": "",
74        }
75        if city.city.name:
76            city_dict["city"] = city.city.name
77        return city_dict
78
79    def city_dict(self, ip_address: str) -> GeoIPDict | None:
80        """Wrapper for self.city that returns a dict"""
81        city = self.city(ip_address)
82        if not city:
83            return None
84        return self.city_to_dict(city)
85
86
87GEOIP_CONTEXT_PROCESSOR = GeoIPContextProcessor()
class GeoIPDict(typing.TypedDict):
20class GeoIPDict(TypedDict):
21    """GeoIP Details"""
22
23    continent: str | None
24    country: str | None
25    lat: float | None
26    long: float | None
27    city: str

GeoIP Details

continent: str | None
country: str | None
lat: float | None
long: float | None
city: str
class GeoIPContextProcessor(authentik.events.context_processors.mmdb.MMDBContextProcessor):
30class GeoIPContextProcessor(MMDBContextProcessor):
31    """Slim wrapper around GeoIP API"""
32
33    def capability(self) -> Capabilities | None:
34        from authentik.api.v3.config import Capabilities
35
36        return Capabilities.CAN_GEO_IP
37
38    def path(self) -> str | None:
39        return CONFIG.get("events.context_processors.geoip")
40
41    def enrich_event(self, event: Event):
42        city = self.city_dict(event.client_ip)
43        if not city:
44            return
45        event.context["geo"] = city
46
47    def enrich_context(self, request: HttpRequest) -> dict:
48        # Different key `geoip` vs `geo` for legacy reasons
49        return {"geoip": self.city_dict(ClientIPMiddleware.get_client_ip(request))}
50
51    def city(self, ip_address: str) -> City | None:
52        """Wrapper for Reader.city"""
53        with start_span(
54            op="authentik.events.geo.city",
55            name=ip_address,
56        ):
57            if not self.configured():
58                return None
59            self.check_expired()
60            try:
61                return self.reader.city(ip_address)
62            except GeoIP2Error, ValueError:
63                return None
64
65    def city_to_dict(self, city: City | None) -> GeoIPDict | dict:
66        """Convert City to dict"""
67        if not city:
68            return {}
69        city_dict: GeoIPDict = {
70            "continent": city.continent.code,
71            "country": city.country.iso_code,
72            "lat": city.location.latitude,
73            "long": city.location.longitude,
74            "city": "",
75        }
76        if city.city.name:
77            city_dict["city"] = city.city.name
78        return city_dict
79
80    def city_dict(self, ip_address: str) -> GeoIPDict | None:
81        """Wrapper for self.city that returns a dict"""
82        city = self.city(ip_address)
83        if not city:
84            return None
85        return self.city_to_dict(city)

Slim wrapper around GeoIP API

def capability(unknown):
33    def capability(self) -> Capabilities | None:
34        from authentik.api.v3.config import Capabilities
35
36        return Capabilities.CAN_GEO_IP

Return the capability this context processor provides

def path(self) -> str | None:
38    def path(self) -> str | None:
39        return CONFIG.get("events.context_processors.geoip")

Get the path to the MMDB file to load

def enrich_event(unknown):
41    def enrich_event(self, event: Event):
42        city = self.city_dict(event.client_ip)
43        if not city:
44            return
45        event.context["geo"] = city

Modify event

def enrich_context(self, request: django.http.request.HttpRequest) -> dict:
47    def enrich_context(self, request: HttpRequest) -> dict:
48        # Different key `geoip` vs `geo` for legacy reasons
49        return {"geoip": self.city_dict(ClientIPMiddleware.get_client_ip(request))}

Modify context

def city(self, ip_address: str) -> geoip2.models.City | None:
51    def city(self, ip_address: str) -> City | None:
52        """Wrapper for Reader.city"""
53        with start_span(
54            op="authentik.events.geo.city",
55            name=ip_address,
56        ):
57            if not self.configured():
58                return None
59            self.check_expired()
60            try:
61                return self.reader.city(ip_address)
62            except GeoIP2Error, ValueError:
63                return None

Wrapper for Reader.city

def city_to_dict( self, city: geoip2.models.City | None) -> GeoIPDict | dict:
65    def city_to_dict(self, city: City | None) -> GeoIPDict | dict:
66        """Convert City to dict"""
67        if not city:
68            return {}
69        city_dict: GeoIPDict = {
70            "continent": city.continent.code,
71            "country": city.country.iso_code,
72            "lat": city.location.latitude,
73            "long": city.location.longitude,
74            "city": "",
75        }
76        if city.city.name:
77            city_dict["city"] = city.city.name
78        return city_dict

Convert City to dict

def city_dict( self, ip_address: str) -> GeoIPDict | None:
80    def city_dict(self, ip_address: str) -> GeoIPDict | None:
81        """Wrapper for self.city that returns a dict"""
82        city = self.city(ip_address)
83        if not city:
84            return None
85        return self.city_to_dict(city)

Wrapper for self.city that returns a dict

GEOIP_CONTEXT_PROCESSOR = <GeoIPContextProcessor object>