authentik.events.context_processors.asn

ASN Enricher

 1"""ASN Enricher"""
 2
 3from typing import TYPE_CHECKING, TypedDict
 4
 5from django.http import HttpRequest
 6from geoip2.errors import GeoIP2Error
 7from geoip2.models import ASN
 8from sentry_sdk import start_span
 9
10from authentik.events.context_processors.mmdb import MMDBContextProcessor
11from authentik.lib.config import CONFIG
12from authentik.root.middleware import ClientIPMiddleware
13
14if TYPE_CHECKING:
15    from authentik.api.v3.config import Capabilities
16    from authentik.events.models import Event
17
18
19class ASNDict(TypedDict):
20    """ASN Details"""
21
22    asn: int | None
23    as_org: str | None
24    network: str | None
25
26
27class ASNContextProcessor(MMDBContextProcessor):
28    """ASN Database reader wrapper"""
29
30    def capability(self) -> Capabilities | None:
31        from authentik.api.v3.config import Capabilities
32
33        return Capabilities.CAN_ASN
34
35    def path(self) -> str | None:
36        return CONFIG.get("events.context_processors.asn")
37
38    def enrich_event(self, event: Event):
39        asn = self.asn_dict(event.client_ip)
40        if not asn:
41            return
42        event.context["asn"] = asn
43
44    def enrich_context(self, request: HttpRequest) -> dict:
45        return {
46            "asn": self.asn_dict(ClientIPMiddleware.get_client_ip(request)),
47        }
48
49    def asn(self, ip_address: str) -> ASN | None:
50        """Wrapper for Reader.asn"""
51        with start_span(
52            op="authentik.events.asn.asn",
53            name=ip_address,
54        ):
55            if not self.configured():
56                return None
57            self.check_expired()
58            try:
59                return self.reader.asn(ip_address)
60            except GeoIP2Error, ValueError:
61                return None
62
63    def asn_to_dict(self, asn: ASN | None) -> ASNDict | dict:
64        """Convert ASN to dict"""
65        if not asn:
66            return {}
67        asn_dict: ASNDict = {
68            "asn": asn.autonomous_system_number,
69            "as_org": asn.autonomous_system_organization,
70            "network": str(asn.network) if asn.network else None,
71        }
72        return asn_dict
73
74    def asn_dict(self, ip_address: str) -> ASNDict | None:
75        """Wrapper for self.asn that returns a dict"""
76        asn = self.asn(ip_address)
77        if not asn:
78            return None
79        return self.asn_to_dict(asn)
80
81
82ASN_CONTEXT_PROCESSOR = ASNContextProcessor()
class ASNDict(typing.TypedDict):
20class ASNDict(TypedDict):
21    """ASN Details"""
22
23    asn: int | None
24    as_org: str | None
25    network: str | None

ASN Details

asn: int | None
as_org: str | None
network: str | None
class ASNContextProcessor(authentik.events.context_processors.mmdb.MMDBContextProcessor):
28class ASNContextProcessor(MMDBContextProcessor):
29    """ASN Database reader wrapper"""
30
31    def capability(self) -> Capabilities | None:
32        from authentik.api.v3.config import Capabilities
33
34        return Capabilities.CAN_ASN
35
36    def path(self) -> str | None:
37        return CONFIG.get("events.context_processors.asn")
38
39    def enrich_event(self, event: Event):
40        asn = self.asn_dict(event.client_ip)
41        if not asn:
42            return
43        event.context["asn"] = asn
44
45    def enrich_context(self, request: HttpRequest) -> dict:
46        return {
47            "asn": self.asn_dict(ClientIPMiddleware.get_client_ip(request)),
48        }
49
50    def asn(self, ip_address: str) -> ASN | None:
51        """Wrapper for Reader.asn"""
52        with start_span(
53            op="authentik.events.asn.asn",
54            name=ip_address,
55        ):
56            if not self.configured():
57                return None
58            self.check_expired()
59            try:
60                return self.reader.asn(ip_address)
61            except GeoIP2Error, ValueError:
62                return None
63
64    def asn_to_dict(self, asn: ASN | None) -> ASNDict | dict:
65        """Convert ASN to dict"""
66        if not asn:
67            return {}
68        asn_dict: ASNDict = {
69            "asn": asn.autonomous_system_number,
70            "as_org": asn.autonomous_system_organization,
71            "network": str(asn.network) if asn.network else None,
72        }
73        return asn_dict
74
75    def asn_dict(self, ip_address: str) -> ASNDict | None:
76        """Wrapper for self.asn that returns a dict"""
77        asn = self.asn(ip_address)
78        if not asn:
79            return None
80        return self.asn_to_dict(asn)

ASN Database reader wrapper

def capability(unknown):
31    def capability(self) -> Capabilities | None:
32        from authentik.api.v3.config import Capabilities
33
34        return Capabilities.CAN_ASN

Return the capability this context processor provides

def path(self) -> str | None:
36    def path(self) -> str | None:
37        return CONFIG.get("events.context_processors.asn")

Get the path to the MMDB file to load

def enrich_event(unknown):
39    def enrich_event(self, event: Event):
40        asn = self.asn_dict(event.client_ip)
41        if not asn:
42            return
43        event.context["asn"] = asn

Modify event

def enrich_context(self, request: django.http.request.HttpRequest) -> dict:
45    def enrich_context(self, request: HttpRequest) -> dict:
46        return {
47            "asn": self.asn_dict(ClientIPMiddleware.get_client_ip(request)),
48        }

Modify context

def asn(self, ip_address: str) -> geoip2.models.ASN | None:
50    def asn(self, ip_address: str) -> ASN | None:
51        """Wrapper for Reader.asn"""
52        with start_span(
53            op="authentik.events.asn.asn",
54            name=ip_address,
55        ):
56            if not self.configured():
57                return None
58            self.check_expired()
59            try:
60                return self.reader.asn(ip_address)
61            except GeoIP2Error, ValueError:
62                return None

Wrapper for Reader.asn

def asn_to_dict( self, asn: geoip2.models.ASN | None) -> ASNDict | dict:
64    def asn_to_dict(self, asn: ASN | None) -> ASNDict | dict:
65        """Convert ASN to dict"""
66        if not asn:
67            return {}
68        asn_dict: ASNDict = {
69            "asn": asn.autonomous_system_number,
70            "as_org": asn.autonomous_system_organization,
71            "network": str(asn.network) if asn.network else None,
72        }
73        return asn_dict

Convert ASN to dict

def asn_dict( self, ip_address: str) -> ASNDict | None:
75    def asn_dict(self, ip_address: str) -> ASNDict | None:
76        """Wrapper for self.asn that returns a dict"""
77        asn = self.asn(ip_address)
78        if not asn:
79            return None
80        return self.asn_to_dict(asn)

Wrapper for self.asn that returns a dict

ASN_CONTEXT_PROCESSOR = <ASNContextProcessor object>