authentik.events.context_processors.asn
ASN Enricher
1"""ASN Enricher""" 2 3from typing import TYPE_CHECKING, TypedDict 4 5from django.http import HttpRequest 6from geoip2.errors import GeoIP2Error 7from geoip2.models import ASN 8from sentry_sdk import start_span 9 10from authentik.events.context_processors.mmdb import MMDBContextProcessor 11from authentik.lib.config import CONFIG 12from authentik.root.middleware import ClientIPMiddleware 13 14if TYPE_CHECKING: 15 from authentik.api.v3.config import Capabilities 16 from authentik.events.models import Event 17 18 19class ASNDict(TypedDict): 20 """ASN Details""" 21 22 asn: int | None 23 as_org: str | None 24 network: str | None 25 26 27class ASNContextProcessor(MMDBContextProcessor): 28 """ASN Database reader wrapper""" 29 30 def capability(self) -> Capabilities | None: 31 from authentik.api.v3.config import Capabilities 32 33 return Capabilities.CAN_ASN 34 35 def path(self) -> str | None: 36 return CONFIG.get("events.context_processors.asn") 37 38 def enrich_event(self, event: Event): 39 asn = self.asn_dict(event.client_ip) 40 if not asn: 41 return 42 event.context["asn"] = asn 43 44 def enrich_context(self, request: HttpRequest) -> dict: 45 return { 46 "asn": self.asn_dict(ClientIPMiddleware.get_client_ip(request)), 47 } 48 49 def asn(self, ip_address: str) -> ASN | None: 50 """Wrapper for Reader.asn""" 51 with start_span( 52 op="authentik.events.asn.asn", 53 name=ip_address, 54 ): 55 if not self.configured(): 56 return None 57 self.check_expired() 58 try: 59 return self.reader.asn(ip_address) 60 except GeoIP2Error, ValueError: 61 return None 62 63 def asn_to_dict(self, asn: ASN | None) -> ASNDict | dict: 64 """Convert ASN to dict""" 65 if not asn: 66 return {} 67 asn_dict: ASNDict = { 68 "asn": asn.autonomous_system_number, 69 "as_org": asn.autonomous_system_organization, 70 "network": str(asn.network) if asn.network else None, 71 } 72 return asn_dict 73 74 def asn_dict(self, ip_address: str) -> ASNDict | None: 75 """Wrapper for self.asn that returns a dict""" 76 asn = self.asn(ip_address) 77 if not asn: 78 return None 79 return self.asn_to_dict(asn) 80 81 82ASN_CONTEXT_PROCESSOR = ASNContextProcessor()
class
ASNDict(typing.TypedDict):
20class ASNDict(TypedDict): 21 """ASN Details""" 22 23 asn: int | None 24 as_org: str | None 25 network: str | None
ASN Details
28class ASNContextProcessor(MMDBContextProcessor): 29 """ASN Database reader wrapper""" 30 31 def capability(self) -> Capabilities | None: 32 from authentik.api.v3.config import Capabilities 33 34 return Capabilities.CAN_ASN 35 36 def path(self) -> str | None: 37 return CONFIG.get("events.context_processors.asn") 38 39 def enrich_event(self, event: Event): 40 asn = self.asn_dict(event.client_ip) 41 if not asn: 42 return 43 event.context["asn"] = asn 44 45 def enrich_context(self, request: HttpRequest) -> dict: 46 return { 47 "asn": self.asn_dict(ClientIPMiddleware.get_client_ip(request)), 48 } 49 50 def asn(self, ip_address: str) -> ASN | None: 51 """Wrapper for Reader.asn""" 52 with start_span( 53 op="authentik.events.asn.asn", 54 name=ip_address, 55 ): 56 if not self.configured(): 57 return None 58 self.check_expired() 59 try: 60 return self.reader.asn(ip_address) 61 except GeoIP2Error, ValueError: 62 return None 63 64 def asn_to_dict(self, asn: ASN | None) -> ASNDict | dict: 65 """Convert ASN to dict""" 66 if not asn: 67 return {} 68 asn_dict: ASNDict = { 69 "asn": asn.autonomous_system_number, 70 "as_org": asn.autonomous_system_organization, 71 "network": str(asn.network) if asn.network else None, 72 } 73 return asn_dict 74 75 def asn_dict(self, ip_address: str) -> ASNDict | None: 76 """Wrapper for self.asn that returns a dict""" 77 asn = self.asn(ip_address) 78 if not asn: 79 return None 80 return self.asn_to_dict(asn)
ASN Database reader wrapper
def
capability(unknown):
31 def capability(self) -> Capabilities | None: 32 from authentik.api.v3.config import Capabilities 33 34 return Capabilities.CAN_ASN
Return the capability this context processor provides
def
enrich_event(unknown):
39 def enrich_event(self, event: Event): 40 asn = self.asn_dict(event.client_ip) 41 if not asn: 42 return 43 event.context["asn"] = asn
Modify event
def
enrich_context(self, request: django.http.request.HttpRequest) -> dict:
45 def enrich_context(self, request: HttpRequest) -> dict: 46 return { 47 "asn": self.asn_dict(ClientIPMiddleware.get_client_ip(request)), 48 }
Modify context
def
asn(self, ip_address: str) -> geoip2.models.ASN | None:
50 def asn(self, ip_address: str) -> ASN | None: 51 """Wrapper for Reader.asn""" 52 with start_span( 53 op="authentik.events.asn.asn", 54 name=ip_address, 55 ): 56 if not self.configured(): 57 return None 58 self.check_expired() 59 try: 60 return self.reader.asn(ip_address) 61 except GeoIP2Error, ValueError: 62 return None
Wrapper for Reader.asn
64 def asn_to_dict(self, asn: ASN | None) -> ASNDict | dict: 65 """Convert ASN to dict""" 66 if not asn: 67 return {} 68 asn_dict: ASNDict = { 69 "asn": asn.autonomous_system_number, 70 "as_org": asn.autonomous_system_organization, 71 "network": str(asn.network) if asn.network else None, 72 } 73 return asn_dict
Convert ASN to dict
ASN_CONTEXT_PROCESSOR =
<ASNContextProcessor object>