authentik.stages.user_login.middleware

Sessions bound to ASN/Network and GeoIP/Continent/etc

  1"""Sessions bound to ASN/Network and GeoIP/Continent/etc"""
  2
  3from django.contrib.auth.middleware import AuthenticationMiddleware
  4from django.contrib.auth.signals import user_logged_out
  5from django.contrib.auth.views import redirect_to_login
  6from django.http.request import HttpRequest
  7from structlog.stdlib import get_logger
  8
  9from authentik.core.middleware import get_user
 10from authentik.core.models import Session
 11from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR
 12from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR
 13from authentik.lib.sentry import SentryIgnoredException
 14from authentik.root.middleware import ClientIPMiddleware, SessionMiddleware
 15from authentik.stages.user_login.models import GeoIPBinding, NetworkBinding
 16
 17SESSION_KEY_BINDING_NET = "authentik/stages/user_login/binding/net"
 18SESSION_KEY_BINDING_GEO = "authentik/stages/user_login/binding/geo"
 19LOGGER = get_logger()
 20
 21
 22class SessionBindingBroken(SentryIgnoredException):
 23    """Session binding was broken due to specified `reason`"""
 24
 25    def __init__(  # noqa: PLR0913
 26        self, reason: str, old_value: str, new_value: str, old_ip: str, new_ip: str
 27    ) -> None:
 28        self.reason = reason
 29        self.old_value = old_value
 30        self.new_value = new_value
 31        self.old_ip = old_ip
 32        self.new_ip = new_ip
 33
 34    def __repr__(self) -> str:
 35        return (
 36            f"Session binding broken due to {self.reason}; "
 37            f"old value: {self.old_value}, new value: {self.new_value}"
 38        )
 39
 40    def to_event(self) -> dict:
 41        """Convert to dict for usage with event"""
 42        return {
 43            "logout_reason": "Session binding broken",
 44            "binding": {
 45                "reason": self.reason,
 46                "previous_value": self.old_value,
 47                "new_value": self.new_value,
 48            },
 49            "ip": {
 50                "previous": self.old_ip,
 51                "new": self.new_ip,
 52            },
 53        }
 54
 55
 56def logout_extra(request: HttpRequest, exc: SessionBindingBroken):
 57    """Similar to django's logout method, but able to carry more info to the signal"""
 58    # Since this middleware runs before the AuthenticationMiddleware, we can't use `request.user`
 59    # as it hasn't been populated yet.
 60    user = get_user(request)
 61    if not getattr(user, "is_authenticated", True):
 62        user = None
 63    # Dispatch the signal before the user is logged out so the receivers have a
 64    # chance to find out *who* logged out.
 65    user_logged_out.send(
 66        sender=user.__class__, request=request, user=user, event_extra=exc.to_event()
 67    )
 68    request.session.flush()
 69    if hasattr(request, "user"):
 70        from django.contrib.auth.models import AnonymousUser
 71
 72        request.user = AnonymousUser()
 73
 74
 75class BoundSessionMiddleware(SessionMiddleware):
 76    """Sessions bound to ASN/Network and GeoIP/Continent/etc"""
 77
 78    def process_request(self, request: HttpRequest):
 79        super().process_request(request)
 80        try:
 81            self.recheck_session(request)
 82        except SessionBindingBroken as exc:
 83            LOGGER.warning("Session binding broken", exc=exc)
 84            # At this point, we need to logout the current user
 85            # however since this middleware has to run before the `AuthenticationMiddleware`
 86            # we don't have access to the user yet
 87            # Logout will still work, however event logs won't display the user being logged out
 88            AuthenticationMiddleware(lambda request: request).process_request(request)
 89            logout_extra(request, exc)
 90            request.session.clear()
 91            return redirect_to_login(request.get_full_path())
 92        return None
 93
 94    def recheck_session(self, request: HttpRequest):
 95        """Check if a session is still valid with a changed IP"""
 96        last_ip = request.session.get(Session.Keys.LAST_IP)
 97        new_ip = ClientIPMiddleware.get_client_ip(request)
 98        # Check changed IP
 99        if new_ip == last_ip:
100            return
101        configured_binding_net = request.session.get(
102            SESSION_KEY_BINDING_NET, NetworkBinding.NO_BINDING
103        )
104        configured_binding_geo = request.session.get(
105            SESSION_KEY_BINDING_GEO, GeoIPBinding.NO_BINDING
106        )
107        if configured_binding_net != NetworkBinding.NO_BINDING:
108            BoundSessionMiddleware.recheck_session_net(configured_binding_net, last_ip, new_ip)
109        if configured_binding_geo != GeoIPBinding.NO_BINDING:
110            BoundSessionMiddleware.recheck_session_geo(configured_binding_geo, last_ip, new_ip)
111        # If we got to this point without any error being raised, we need to
112        # update the last saved IP to the current one
113        if SESSION_KEY_BINDING_NET in request.session or SESSION_KEY_BINDING_GEO in request.session:
114            # Only set the last IP in the session if there's a binding specified
115            # (== basically requires the user to be logged in)
116            request.session[Session.Keys.LAST_IP] = new_ip
117
118    @staticmethod
119    def recheck_session_net(binding: NetworkBinding, last_ip: str, new_ip: str):
120        """Check network/ASN binding"""
121        last_asn = ASN_CONTEXT_PROCESSOR.asn(last_ip)
122        new_asn = ASN_CONTEXT_PROCESSOR.asn(new_ip)
123        if not last_asn or not new_asn:
124            raise SessionBindingBroken(
125                "network.missing",
126                ASN_CONTEXT_PROCESSOR.asn_to_dict(last_asn),
127                ASN_CONTEXT_PROCESSOR.asn_to_dict(new_asn),
128                last_ip,
129                new_ip,
130            )
131        if binding in [
132            NetworkBinding.BIND_ASN,
133            NetworkBinding.BIND_ASN_NETWORK,
134            NetworkBinding.BIND_ASN_NETWORK_IP,
135        ]:
136            # Check ASN which is required for all 3 modes
137            if last_asn.autonomous_system_number != new_asn.autonomous_system_number:
138                raise SessionBindingBroken(
139                    "network.asn",
140                    last_asn.autonomous_system_number,
141                    new_asn.autonomous_system_number,
142                    last_ip,
143                    new_ip,
144                )
145        if binding in [NetworkBinding.BIND_ASN_NETWORK, NetworkBinding.BIND_ASN_NETWORK_IP]:
146            # Check Network afterwards
147            if last_asn.network != new_asn.network:
148                raise SessionBindingBroken(
149                    "network.asn_network",
150                    str(last_asn.network),
151                    str(new_asn.network),
152                    last_ip,
153                    new_ip,
154                )
155        if binding in [NetworkBinding.BIND_ASN_NETWORK_IP]:
156            # Only require strict IP checking
157            if last_ip != new_ip:
158                raise SessionBindingBroken(
159                    "network.ip",
160                    last_ip,
161                    new_ip,
162                    last_ip,
163                    new_ip,
164                )
165
166    @staticmethod
167    def recheck_session_geo(binding: GeoIPBinding, last_ip: str, new_ip: str):
168        """Check GeoIP binding"""
169        last_geo = GEOIP_CONTEXT_PROCESSOR.city(last_ip)
170        new_geo = GEOIP_CONTEXT_PROCESSOR.city(new_ip)
171        if not last_geo or not new_geo:
172            raise SessionBindingBroken(
173                "geoip.missing",
174                GEOIP_CONTEXT_PROCESSOR.city_to_dict(last_geo),
175                GEOIP_CONTEXT_PROCESSOR.city_to_dict(new_geo),
176                last_ip,
177                new_ip,
178            )
179        if binding in [
180            GeoIPBinding.BIND_CONTINENT,
181            GeoIPBinding.BIND_CONTINENT_COUNTRY,
182            GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY,
183        ]:
184            # Check Continent which is required for all 3 modes
185            if last_geo.continent != new_geo.continent:
186                raise SessionBindingBroken(
187                    "geoip.continent",
188                    last_geo.continent.to_dict(),
189                    new_geo.continent.to_dict(),
190                    last_ip,
191                    new_ip,
192                )
193        if binding in [
194            GeoIPBinding.BIND_CONTINENT_COUNTRY,
195            GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY,
196        ]:
197            # Check Country afterwards
198            if last_geo.country != new_geo.country:
199                raise SessionBindingBroken(
200                    "geoip.country",
201                    last_geo.country.to_dict(),
202                    new_geo.country.to_dict(),
203                    last_ip,
204                    new_ip,
205                )
206        if binding in [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY]:
207            # Check city afterwards
208            if last_geo.city != new_geo.city:
209                raise SessionBindingBroken(
210                    "geoip.city",
211                    last_geo.city.to_dict(),
212                    new_geo.city.to_dict(),
213                    last_ip,
214                    new_ip,
215                )
SESSION_KEY_BINDING_NET = 'authentik/stages/user_login/binding/net'
SESSION_KEY_BINDING_GEO = 'authentik/stages/user_login/binding/geo'
LOGGER = <BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
class SessionBindingBroken(authentik.lib.sentry.SentryIgnoredException):
23class SessionBindingBroken(SentryIgnoredException):
24    """Session binding was broken due to specified `reason`"""
25
26    def __init__(  # noqa: PLR0913
27        self, reason: str, old_value: str, new_value: str, old_ip: str, new_ip: str
28    ) -> None:
29        self.reason = reason
30        self.old_value = old_value
31        self.new_value = new_value
32        self.old_ip = old_ip
33        self.new_ip = new_ip
34
35    def __repr__(self) -> str:
36        return (
37            f"Session binding broken due to {self.reason}; "
38            f"old value: {self.old_value}, new value: {self.new_value}"
39        )
40
41    def to_event(self) -> dict:
42        """Convert to dict for usage with event"""
43        return {
44            "logout_reason": "Session binding broken",
45            "binding": {
46                "reason": self.reason,
47                "previous_value": self.old_value,
48                "new_value": self.new_value,
49            },
50            "ip": {
51                "previous": self.old_ip,
52                "new": self.new_ip,
53            },
54        }

Session binding was broken due to specified reason

SessionBindingBroken( reason: str, old_value: str, new_value: str, old_ip: str, new_ip: str)
26    def __init__(  # noqa: PLR0913
27        self, reason: str, old_value: str, new_value: str, old_ip: str, new_ip: str
28    ) -> None:
29        self.reason = reason
30        self.old_value = old_value
31        self.new_value = new_value
32        self.old_ip = old_ip
33        self.new_ip = new_ip
reason
old_value
new_value
old_ip
new_ip
def to_event(self) -> dict:
41    def to_event(self) -> dict:
42        """Convert to dict for usage with event"""
43        return {
44            "logout_reason": "Session binding broken",
45            "binding": {
46                "reason": self.reason,
47                "previous_value": self.old_value,
48                "new_value": self.new_value,
49            },
50            "ip": {
51                "previous": self.old_ip,
52                "new": self.new_ip,
53            },
54        }

Convert to dict for usage with event

def logout_extra( request: django.http.request.HttpRequest, exc: SessionBindingBroken):
57def logout_extra(request: HttpRequest, exc: SessionBindingBroken):
58    """Similar to django's logout method, but able to carry more info to the signal"""
59    # Since this middleware runs before the AuthenticationMiddleware, we can't use `request.user`
60    # as it hasn't been populated yet.
61    user = get_user(request)
62    if not getattr(user, "is_authenticated", True):
63        user = None
64    # Dispatch the signal before the user is logged out so the receivers have a
65    # chance to find out *who* logged out.
66    user_logged_out.send(
67        sender=user.__class__, request=request, user=user, event_extra=exc.to_event()
68    )
69    request.session.flush()
70    if hasattr(request, "user"):
71        from django.contrib.auth.models import AnonymousUser
72
73        request.user = AnonymousUser()

Similar to django's logout method, but able to carry more info to the signal

class BoundSessionMiddleware(authentik.root.middleware.SessionMiddleware):
 76class BoundSessionMiddleware(SessionMiddleware):
 77    """Sessions bound to ASN/Network and GeoIP/Continent/etc"""
 78
 79    def process_request(self, request: HttpRequest):
 80        super().process_request(request)
 81        try:
 82            self.recheck_session(request)
 83        except SessionBindingBroken as exc:
 84            LOGGER.warning("Session binding broken", exc=exc)
 85            # At this point, we need to logout the current user
 86            # however since this middleware has to run before the `AuthenticationMiddleware`
 87            # we don't have access to the user yet
 88            # Logout will still work, however event logs won't display the user being logged out
 89            AuthenticationMiddleware(lambda request: request).process_request(request)
 90            logout_extra(request, exc)
 91            request.session.clear()
 92            return redirect_to_login(request.get_full_path())
 93        return None
 94
 95    def recheck_session(self, request: HttpRequest):
 96        """Check if a session is still valid with a changed IP"""
 97        last_ip = request.session.get(Session.Keys.LAST_IP)
 98        new_ip = ClientIPMiddleware.get_client_ip(request)
 99        # Check changed IP
100        if new_ip == last_ip:
101            return
102        configured_binding_net = request.session.get(
103            SESSION_KEY_BINDING_NET, NetworkBinding.NO_BINDING
104        )
105        configured_binding_geo = request.session.get(
106            SESSION_KEY_BINDING_GEO, GeoIPBinding.NO_BINDING
107        )
108        if configured_binding_net != NetworkBinding.NO_BINDING:
109            BoundSessionMiddleware.recheck_session_net(configured_binding_net, last_ip, new_ip)
110        if configured_binding_geo != GeoIPBinding.NO_BINDING:
111            BoundSessionMiddleware.recheck_session_geo(configured_binding_geo, last_ip, new_ip)
112        # If we got to this point without any error being raised, we need to
113        # update the last saved IP to the current one
114        if SESSION_KEY_BINDING_NET in request.session or SESSION_KEY_BINDING_GEO in request.session:
115            # Only set the last IP in the session if there's a binding specified
116            # (== basically requires the user to be logged in)
117            request.session[Session.Keys.LAST_IP] = new_ip
118
119    @staticmethod
120    def recheck_session_net(binding: NetworkBinding, last_ip: str, new_ip: str):
121        """Check network/ASN binding"""
122        last_asn = ASN_CONTEXT_PROCESSOR.asn(last_ip)
123        new_asn = ASN_CONTEXT_PROCESSOR.asn(new_ip)
124        if not last_asn or not new_asn:
125            raise SessionBindingBroken(
126                "network.missing",
127                ASN_CONTEXT_PROCESSOR.asn_to_dict(last_asn),
128                ASN_CONTEXT_PROCESSOR.asn_to_dict(new_asn),
129                last_ip,
130                new_ip,
131            )
132        if binding in [
133            NetworkBinding.BIND_ASN,
134            NetworkBinding.BIND_ASN_NETWORK,
135            NetworkBinding.BIND_ASN_NETWORK_IP,
136        ]:
137            # Check ASN which is required for all 3 modes
138            if last_asn.autonomous_system_number != new_asn.autonomous_system_number:
139                raise SessionBindingBroken(
140                    "network.asn",
141                    last_asn.autonomous_system_number,
142                    new_asn.autonomous_system_number,
143                    last_ip,
144                    new_ip,
145                )
146        if binding in [NetworkBinding.BIND_ASN_NETWORK, NetworkBinding.BIND_ASN_NETWORK_IP]:
147            # Check Network afterwards
148            if last_asn.network != new_asn.network:
149                raise SessionBindingBroken(
150                    "network.asn_network",
151                    str(last_asn.network),
152                    str(new_asn.network),
153                    last_ip,
154                    new_ip,
155                )
156        if binding in [NetworkBinding.BIND_ASN_NETWORK_IP]:
157            # Only require strict IP checking
158            if last_ip != new_ip:
159                raise SessionBindingBroken(
160                    "network.ip",
161                    last_ip,
162                    new_ip,
163                    last_ip,
164                    new_ip,
165                )
166
167    @staticmethod
168    def recheck_session_geo(binding: GeoIPBinding, last_ip: str, new_ip: str):
169        """Check GeoIP binding"""
170        last_geo = GEOIP_CONTEXT_PROCESSOR.city(last_ip)
171        new_geo = GEOIP_CONTEXT_PROCESSOR.city(new_ip)
172        if not last_geo or not new_geo:
173            raise SessionBindingBroken(
174                "geoip.missing",
175                GEOIP_CONTEXT_PROCESSOR.city_to_dict(last_geo),
176                GEOIP_CONTEXT_PROCESSOR.city_to_dict(new_geo),
177                last_ip,
178                new_ip,
179            )
180        if binding in [
181            GeoIPBinding.BIND_CONTINENT,
182            GeoIPBinding.BIND_CONTINENT_COUNTRY,
183            GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY,
184        ]:
185            # Check Continent which is required for all 3 modes
186            if last_geo.continent != new_geo.continent:
187                raise SessionBindingBroken(
188                    "geoip.continent",
189                    last_geo.continent.to_dict(),
190                    new_geo.continent.to_dict(),
191                    last_ip,
192                    new_ip,
193                )
194        if binding in [
195            GeoIPBinding.BIND_CONTINENT_COUNTRY,
196            GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY,
197        ]:
198            # Check Country afterwards
199            if last_geo.country != new_geo.country:
200                raise SessionBindingBroken(
201                    "geoip.country",
202                    last_geo.country.to_dict(),
203                    new_geo.country.to_dict(),
204                    last_ip,
205                    new_ip,
206                )
207        if binding in [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY]:
208            # Check city afterwards
209            if last_geo.city != new_geo.city:
210                raise SessionBindingBroken(
211                    "geoip.city",
212                    last_geo.city.to_dict(),
213                    new_geo.city.to_dict(),
214                    last_ip,
215                    new_ip,
216                )

Sessions bound to ASN/Network and GeoIP/Continent/etc

def process_request(self, request: django.http.request.HttpRequest):
79    def process_request(self, request: HttpRequest):
80        super().process_request(request)
81        try:
82            self.recheck_session(request)
83        except SessionBindingBroken as exc:
84            LOGGER.warning("Session binding broken", exc=exc)
85            # At this point, we need to logout the current user
86            # however since this middleware has to run before the `AuthenticationMiddleware`
87            # we don't have access to the user yet
88            # Logout will still work, however event logs won't display the user being logged out
89            AuthenticationMiddleware(lambda request: request).process_request(request)
90            logout_extra(request, exc)
91            request.session.clear()
92            return redirect_to_login(request.get_full_path())
93        return None
def recheck_session(self, request: django.http.request.HttpRequest):
 95    def recheck_session(self, request: HttpRequest):
 96        """Check if a session is still valid with a changed IP"""
 97        last_ip = request.session.get(Session.Keys.LAST_IP)
 98        new_ip = ClientIPMiddleware.get_client_ip(request)
 99        # Check changed IP
100        if new_ip == last_ip:
101            return
102        configured_binding_net = request.session.get(
103            SESSION_KEY_BINDING_NET, NetworkBinding.NO_BINDING
104        )
105        configured_binding_geo = request.session.get(
106            SESSION_KEY_BINDING_GEO, GeoIPBinding.NO_BINDING
107        )
108        if configured_binding_net != NetworkBinding.NO_BINDING:
109            BoundSessionMiddleware.recheck_session_net(configured_binding_net, last_ip, new_ip)
110        if configured_binding_geo != GeoIPBinding.NO_BINDING:
111            BoundSessionMiddleware.recheck_session_geo(configured_binding_geo, last_ip, new_ip)
112        # If we got to this point without any error being raised, we need to
113        # update the last saved IP to the current one
114        if SESSION_KEY_BINDING_NET in request.session or SESSION_KEY_BINDING_GEO in request.session:
115            # Only set the last IP in the session if there's a binding specified
116            # (== basically requires the user to be logged in)
117            request.session[Session.Keys.LAST_IP] = new_ip

Check if a session is still valid with a changed IP

@staticmethod
def recheck_session_net( binding: authentik.stages.user_login.models.NetworkBinding, last_ip: str, new_ip: str):
119    @staticmethod
120    def recheck_session_net(binding: NetworkBinding, last_ip: str, new_ip: str):
121        """Check network/ASN binding"""
122        last_asn = ASN_CONTEXT_PROCESSOR.asn(last_ip)
123        new_asn = ASN_CONTEXT_PROCESSOR.asn(new_ip)
124        if not last_asn or not new_asn:
125            raise SessionBindingBroken(
126                "network.missing",
127                ASN_CONTEXT_PROCESSOR.asn_to_dict(last_asn),
128                ASN_CONTEXT_PROCESSOR.asn_to_dict(new_asn),
129                last_ip,
130                new_ip,
131            )
132        if binding in [
133            NetworkBinding.BIND_ASN,
134            NetworkBinding.BIND_ASN_NETWORK,
135            NetworkBinding.BIND_ASN_NETWORK_IP,
136        ]:
137            # Check ASN which is required for all 3 modes
138            if last_asn.autonomous_system_number != new_asn.autonomous_system_number:
139                raise SessionBindingBroken(
140                    "network.asn",
141                    last_asn.autonomous_system_number,
142                    new_asn.autonomous_system_number,
143                    last_ip,
144                    new_ip,
145                )
146        if binding in [NetworkBinding.BIND_ASN_NETWORK, NetworkBinding.BIND_ASN_NETWORK_IP]:
147            # Check Network afterwards
148            if last_asn.network != new_asn.network:
149                raise SessionBindingBroken(
150                    "network.asn_network",
151                    str(last_asn.network),
152                    str(new_asn.network),
153                    last_ip,
154                    new_ip,
155                )
156        if binding in [NetworkBinding.BIND_ASN_NETWORK_IP]:
157            # Only require strict IP checking
158            if last_ip != new_ip:
159                raise SessionBindingBroken(
160                    "network.ip",
161                    last_ip,
162                    new_ip,
163                    last_ip,
164                    new_ip,
165                )

Check network/ASN binding

@staticmethod
def recheck_session_geo( binding: authentik.stages.user_login.models.GeoIPBinding, last_ip: str, new_ip: str):
167    @staticmethod
168    def recheck_session_geo(binding: GeoIPBinding, last_ip: str, new_ip: str):
169        """Check GeoIP binding"""
170        last_geo = GEOIP_CONTEXT_PROCESSOR.city(last_ip)
171        new_geo = GEOIP_CONTEXT_PROCESSOR.city(new_ip)
172        if not last_geo or not new_geo:
173            raise SessionBindingBroken(
174                "geoip.missing",
175                GEOIP_CONTEXT_PROCESSOR.city_to_dict(last_geo),
176                GEOIP_CONTEXT_PROCESSOR.city_to_dict(new_geo),
177                last_ip,
178                new_ip,
179            )
180        if binding in [
181            GeoIPBinding.BIND_CONTINENT,
182            GeoIPBinding.BIND_CONTINENT_COUNTRY,
183            GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY,
184        ]:
185            # Check Continent which is required for all 3 modes
186            if last_geo.continent != new_geo.continent:
187                raise SessionBindingBroken(
188                    "geoip.continent",
189                    last_geo.continent.to_dict(),
190                    new_geo.continent.to_dict(),
191                    last_ip,
192                    new_ip,
193                )
194        if binding in [
195            GeoIPBinding.BIND_CONTINENT_COUNTRY,
196            GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY,
197        ]:
198            # Check Country afterwards
199            if last_geo.country != new_geo.country:
200                raise SessionBindingBroken(
201                    "geoip.country",
202                    last_geo.country.to_dict(),
203                    new_geo.country.to_dict(),
204                    last_ip,
205                    new_ip,
206                )
207        if binding in [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY]:
208            # Check city afterwards
209            if last_geo.city != new_geo.city:
210                raise SessionBindingBroken(
211                    "geoip.city",
212                    last_geo.city.to_dict(),
213                    new_geo.city.to_dict(),
214                    last_ip,
215                    new_ip,
216                )

Check GeoIP binding