authentik.stages.user_login.middleware
Sessions bound to ASN/Network and GeoIP/Continent/etc
1"""Sessions bound to ASN/Network and GeoIP/Continent/etc""" 2 3from django.contrib.auth.middleware import AuthenticationMiddleware 4from django.contrib.auth.signals import user_logged_out 5from django.contrib.auth.views import redirect_to_login 6from django.http.request import HttpRequest 7from structlog.stdlib import get_logger 8 9from authentik.core.middleware import get_user 10from authentik.core.models import Session 11from authentik.events.context_processors.asn import ASN_CONTEXT_PROCESSOR 12from authentik.events.context_processors.geoip import GEOIP_CONTEXT_PROCESSOR 13from authentik.lib.sentry import SentryIgnoredException 14from authentik.root.middleware import ClientIPMiddleware, SessionMiddleware 15from authentik.stages.user_login.models import GeoIPBinding, NetworkBinding 16 17SESSION_KEY_BINDING_NET = "authentik/stages/user_login/binding/net" 18SESSION_KEY_BINDING_GEO = "authentik/stages/user_login/binding/geo" 19LOGGER = get_logger() 20 21 22class SessionBindingBroken(SentryIgnoredException): 23 """Session binding was broken due to specified `reason`""" 24 25 def __init__( # noqa: PLR0913 26 self, reason: str, old_value: str, new_value: str, old_ip: str, new_ip: str 27 ) -> None: 28 self.reason = reason 29 self.old_value = old_value 30 self.new_value = new_value 31 self.old_ip = old_ip 32 self.new_ip = new_ip 33 34 def __repr__(self) -> str: 35 return ( 36 f"Session binding broken due to {self.reason}; " 37 f"old value: {self.old_value}, new value: {self.new_value}" 38 ) 39 40 def to_event(self) -> dict: 41 """Convert to dict for usage with event""" 42 return { 43 "logout_reason": "Session binding broken", 44 "binding": { 45 "reason": self.reason, 46 "previous_value": self.old_value, 47 "new_value": self.new_value, 48 }, 49 "ip": { 50 "previous": self.old_ip, 51 "new": self.new_ip, 52 }, 53 } 54 55 56def logout_extra(request: HttpRequest, exc: SessionBindingBroken): 57 """Similar to django's logout method, but able to carry more info to the signal""" 58 # Since this middleware runs before the AuthenticationMiddleware, we can't use `request.user` 59 # as it hasn't been populated yet. 60 user = get_user(request) 61 if not getattr(user, "is_authenticated", True): 62 user = None 63 # Dispatch the signal before the user is logged out so the receivers have a 64 # chance to find out *who* logged out. 65 user_logged_out.send( 66 sender=user.__class__, request=request, user=user, event_extra=exc.to_event() 67 ) 68 request.session.flush() 69 if hasattr(request, "user"): 70 from django.contrib.auth.models import AnonymousUser 71 72 request.user = AnonymousUser() 73 74 75class BoundSessionMiddleware(SessionMiddleware): 76 """Sessions bound to ASN/Network and GeoIP/Continent/etc""" 77 78 def process_request(self, request: HttpRequest): 79 super().process_request(request) 80 try: 81 self.recheck_session(request) 82 except SessionBindingBroken as exc: 83 LOGGER.warning("Session binding broken", exc=exc) 84 # At this point, we need to logout the current user 85 # however since this middleware has to run before the `AuthenticationMiddleware` 86 # we don't have access to the user yet 87 # Logout will still work, however event logs won't display the user being logged out 88 AuthenticationMiddleware(lambda request: request).process_request(request) 89 logout_extra(request, exc) 90 request.session.clear() 91 return redirect_to_login(request.get_full_path()) 92 return None 93 94 def recheck_session(self, request: HttpRequest): 95 """Check if a session is still valid with a changed IP""" 96 last_ip = request.session.get(Session.Keys.LAST_IP) 97 new_ip = ClientIPMiddleware.get_client_ip(request) 98 # Check changed IP 99 if new_ip == last_ip: 100 return 101 configured_binding_net = request.session.get( 102 SESSION_KEY_BINDING_NET, NetworkBinding.NO_BINDING 103 ) 104 configured_binding_geo = request.session.get( 105 SESSION_KEY_BINDING_GEO, GeoIPBinding.NO_BINDING 106 ) 107 if configured_binding_net != NetworkBinding.NO_BINDING: 108 BoundSessionMiddleware.recheck_session_net(configured_binding_net, last_ip, new_ip) 109 if configured_binding_geo != GeoIPBinding.NO_BINDING: 110 BoundSessionMiddleware.recheck_session_geo(configured_binding_geo, last_ip, new_ip) 111 # If we got to this point without any error being raised, we need to 112 # update the last saved IP to the current one 113 if SESSION_KEY_BINDING_NET in request.session or SESSION_KEY_BINDING_GEO in request.session: 114 # Only set the last IP in the session if there's a binding specified 115 # (== basically requires the user to be logged in) 116 request.session[Session.Keys.LAST_IP] = new_ip 117 118 @staticmethod 119 def recheck_session_net(binding: NetworkBinding, last_ip: str, new_ip: str): 120 """Check network/ASN binding""" 121 last_asn = ASN_CONTEXT_PROCESSOR.asn(last_ip) 122 new_asn = ASN_CONTEXT_PROCESSOR.asn(new_ip) 123 if not last_asn or not new_asn: 124 raise SessionBindingBroken( 125 "network.missing", 126 ASN_CONTEXT_PROCESSOR.asn_to_dict(last_asn), 127 ASN_CONTEXT_PROCESSOR.asn_to_dict(new_asn), 128 last_ip, 129 new_ip, 130 ) 131 if binding in [ 132 NetworkBinding.BIND_ASN, 133 NetworkBinding.BIND_ASN_NETWORK, 134 NetworkBinding.BIND_ASN_NETWORK_IP, 135 ]: 136 # Check ASN which is required for all 3 modes 137 if last_asn.autonomous_system_number != new_asn.autonomous_system_number: 138 raise SessionBindingBroken( 139 "network.asn", 140 last_asn.autonomous_system_number, 141 new_asn.autonomous_system_number, 142 last_ip, 143 new_ip, 144 ) 145 if binding in [NetworkBinding.BIND_ASN_NETWORK, NetworkBinding.BIND_ASN_NETWORK_IP]: 146 # Check Network afterwards 147 if last_asn.network != new_asn.network: 148 raise SessionBindingBroken( 149 "network.asn_network", 150 str(last_asn.network), 151 str(new_asn.network), 152 last_ip, 153 new_ip, 154 ) 155 if binding in [NetworkBinding.BIND_ASN_NETWORK_IP]: 156 # Only require strict IP checking 157 if last_ip != new_ip: 158 raise SessionBindingBroken( 159 "network.ip", 160 last_ip, 161 new_ip, 162 last_ip, 163 new_ip, 164 ) 165 166 @staticmethod 167 def recheck_session_geo(binding: GeoIPBinding, last_ip: str, new_ip: str): 168 """Check GeoIP binding""" 169 last_geo = GEOIP_CONTEXT_PROCESSOR.city(last_ip) 170 new_geo = GEOIP_CONTEXT_PROCESSOR.city(new_ip) 171 if not last_geo or not new_geo: 172 raise SessionBindingBroken( 173 "geoip.missing", 174 GEOIP_CONTEXT_PROCESSOR.city_to_dict(last_geo), 175 GEOIP_CONTEXT_PROCESSOR.city_to_dict(new_geo), 176 last_ip, 177 new_ip, 178 ) 179 if binding in [ 180 GeoIPBinding.BIND_CONTINENT, 181 GeoIPBinding.BIND_CONTINENT_COUNTRY, 182 GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, 183 ]: 184 # Check Continent which is required for all 3 modes 185 if last_geo.continent != new_geo.continent: 186 raise SessionBindingBroken( 187 "geoip.continent", 188 last_geo.continent.to_dict(), 189 new_geo.continent.to_dict(), 190 last_ip, 191 new_ip, 192 ) 193 if binding in [ 194 GeoIPBinding.BIND_CONTINENT_COUNTRY, 195 GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, 196 ]: 197 # Check Country afterwards 198 if last_geo.country != new_geo.country: 199 raise SessionBindingBroken( 200 "geoip.country", 201 last_geo.country.to_dict(), 202 new_geo.country.to_dict(), 203 last_ip, 204 new_ip, 205 ) 206 if binding in [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY]: 207 # Check city afterwards 208 if last_geo.city != new_geo.city: 209 raise SessionBindingBroken( 210 "geoip.city", 211 last_geo.city.to_dict(), 212 new_geo.city.to_dict(), 213 last_ip, 214 new_ip, 215 )
SESSION_KEY_BINDING_NET =
'authentik/stages/user_login/binding/net'
SESSION_KEY_BINDING_GEO =
'authentik/stages/user_login/binding/geo'
LOGGER =
<BoundLoggerLazyProxy(logger=None, wrapper_class=None, processors=None, context_class=None, initial_values={}, logger_factory_args=())>
23class SessionBindingBroken(SentryIgnoredException): 24 """Session binding was broken due to specified `reason`""" 25 26 def __init__( # noqa: PLR0913 27 self, reason: str, old_value: str, new_value: str, old_ip: str, new_ip: str 28 ) -> None: 29 self.reason = reason 30 self.old_value = old_value 31 self.new_value = new_value 32 self.old_ip = old_ip 33 self.new_ip = new_ip 34 35 def __repr__(self) -> str: 36 return ( 37 f"Session binding broken due to {self.reason}; " 38 f"old value: {self.old_value}, new value: {self.new_value}" 39 ) 40 41 def to_event(self) -> dict: 42 """Convert to dict for usage with event""" 43 return { 44 "logout_reason": "Session binding broken", 45 "binding": { 46 "reason": self.reason, 47 "previous_value": self.old_value, 48 "new_value": self.new_value, 49 }, 50 "ip": { 51 "previous": self.old_ip, 52 "new": self.new_ip, 53 }, 54 }
Session binding was broken due to specified reason
def
to_event(self) -> dict:
41 def to_event(self) -> dict: 42 """Convert to dict for usage with event""" 43 return { 44 "logout_reason": "Session binding broken", 45 "binding": { 46 "reason": self.reason, 47 "previous_value": self.old_value, 48 "new_value": self.new_value, 49 }, 50 "ip": { 51 "previous": self.old_ip, 52 "new": self.new_ip, 53 }, 54 }
Convert to dict for usage with event
57def logout_extra(request: HttpRequest, exc: SessionBindingBroken): 58 """Similar to django's logout method, but able to carry more info to the signal""" 59 # Since this middleware runs before the AuthenticationMiddleware, we can't use `request.user` 60 # as it hasn't been populated yet. 61 user = get_user(request) 62 if not getattr(user, "is_authenticated", True): 63 user = None 64 # Dispatch the signal before the user is logged out so the receivers have a 65 # chance to find out *who* logged out. 66 user_logged_out.send( 67 sender=user.__class__, request=request, user=user, event_extra=exc.to_event() 68 ) 69 request.session.flush() 70 if hasattr(request, "user"): 71 from django.contrib.auth.models import AnonymousUser 72 73 request.user = AnonymousUser()
Similar to django's logout method, but able to carry more info to the signal
76class BoundSessionMiddleware(SessionMiddleware): 77 """Sessions bound to ASN/Network and GeoIP/Continent/etc""" 78 79 def process_request(self, request: HttpRequest): 80 super().process_request(request) 81 try: 82 self.recheck_session(request) 83 except SessionBindingBroken as exc: 84 LOGGER.warning("Session binding broken", exc=exc) 85 # At this point, we need to logout the current user 86 # however since this middleware has to run before the `AuthenticationMiddleware` 87 # we don't have access to the user yet 88 # Logout will still work, however event logs won't display the user being logged out 89 AuthenticationMiddleware(lambda request: request).process_request(request) 90 logout_extra(request, exc) 91 request.session.clear() 92 return redirect_to_login(request.get_full_path()) 93 return None 94 95 def recheck_session(self, request: HttpRequest): 96 """Check if a session is still valid with a changed IP""" 97 last_ip = request.session.get(Session.Keys.LAST_IP) 98 new_ip = ClientIPMiddleware.get_client_ip(request) 99 # Check changed IP 100 if new_ip == last_ip: 101 return 102 configured_binding_net = request.session.get( 103 SESSION_KEY_BINDING_NET, NetworkBinding.NO_BINDING 104 ) 105 configured_binding_geo = request.session.get( 106 SESSION_KEY_BINDING_GEO, GeoIPBinding.NO_BINDING 107 ) 108 if configured_binding_net != NetworkBinding.NO_BINDING: 109 BoundSessionMiddleware.recheck_session_net(configured_binding_net, last_ip, new_ip) 110 if configured_binding_geo != GeoIPBinding.NO_BINDING: 111 BoundSessionMiddleware.recheck_session_geo(configured_binding_geo, last_ip, new_ip) 112 # If we got to this point without any error being raised, we need to 113 # update the last saved IP to the current one 114 if SESSION_KEY_BINDING_NET in request.session or SESSION_KEY_BINDING_GEO in request.session: 115 # Only set the last IP in the session if there's a binding specified 116 # (== basically requires the user to be logged in) 117 request.session[Session.Keys.LAST_IP] = new_ip 118 119 @staticmethod 120 def recheck_session_net(binding: NetworkBinding, last_ip: str, new_ip: str): 121 """Check network/ASN binding""" 122 last_asn = ASN_CONTEXT_PROCESSOR.asn(last_ip) 123 new_asn = ASN_CONTEXT_PROCESSOR.asn(new_ip) 124 if not last_asn or not new_asn: 125 raise SessionBindingBroken( 126 "network.missing", 127 ASN_CONTEXT_PROCESSOR.asn_to_dict(last_asn), 128 ASN_CONTEXT_PROCESSOR.asn_to_dict(new_asn), 129 last_ip, 130 new_ip, 131 ) 132 if binding in [ 133 NetworkBinding.BIND_ASN, 134 NetworkBinding.BIND_ASN_NETWORK, 135 NetworkBinding.BIND_ASN_NETWORK_IP, 136 ]: 137 # Check ASN which is required for all 3 modes 138 if last_asn.autonomous_system_number != new_asn.autonomous_system_number: 139 raise SessionBindingBroken( 140 "network.asn", 141 last_asn.autonomous_system_number, 142 new_asn.autonomous_system_number, 143 last_ip, 144 new_ip, 145 ) 146 if binding in [NetworkBinding.BIND_ASN_NETWORK, NetworkBinding.BIND_ASN_NETWORK_IP]: 147 # Check Network afterwards 148 if last_asn.network != new_asn.network: 149 raise SessionBindingBroken( 150 "network.asn_network", 151 str(last_asn.network), 152 str(new_asn.network), 153 last_ip, 154 new_ip, 155 ) 156 if binding in [NetworkBinding.BIND_ASN_NETWORK_IP]: 157 # Only require strict IP checking 158 if last_ip != new_ip: 159 raise SessionBindingBroken( 160 "network.ip", 161 last_ip, 162 new_ip, 163 last_ip, 164 new_ip, 165 ) 166 167 @staticmethod 168 def recheck_session_geo(binding: GeoIPBinding, last_ip: str, new_ip: str): 169 """Check GeoIP binding""" 170 last_geo = GEOIP_CONTEXT_PROCESSOR.city(last_ip) 171 new_geo = GEOIP_CONTEXT_PROCESSOR.city(new_ip) 172 if not last_geo or not new_geo: 173 raise SessionBindingBroken( 174 "geoip.missing", 175 GEOIP_CONTEXT_PROCESSOR.city_to_dict(last_geo), 176 GEOIP_CONTEXT_PROCESSOR.city_to_dict(new_geo), 177 last_ip, 178 new_ip, 179 ) 180 if binding in [ 181 GeoIPBinding.BIND_CONTINENT, 182 GeoIPBinding.BIND_CONTINENT_COUNTRY, 183 GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, 184 ]: 185 # Check Continent which is required for all 3 modes 186 if last_geo.continent != new_geo.continent: 187 raise SessionBindingBroken( 188 "geoip.continent", 189 last_geo.continent.to_dict(), 190 new_geo.continent.to_dict(), 191 last_ip, 192 new_ip, 193 ) 194 if binding in [ 195 GeoIPBinding.BIND_CONTINENT_COUNTRY, 196 GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, 197 ]: 198 # Check Country afterwards 199 if last_geo.country != new_geo.country: 200 raise SessionBindingBroken( 201 "geoip.country", 202 last_geo.country.to_dict(), 203 new_geo.country.to_dict(), 204 last_ip, 205 new_ip, 206 ) 207 if binding in [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY]: 208 # Check city afterwards 209 if last_geo.city != new_geo.city: 210 raise SessionBindingBroken( 211 "geoip.city", 212 last_geo.city.to_dict(), 213 new_geo.city.to_dict(), 214 last_ip, 215 new_ip, 216 )
Sessions bound to ASN/Network and GeoIP/Continent/etc
def
process_request(self, request: django.http.request.HttpRequest):
79 def process_request(self, request: HttpRequest): 80 super().process_request(request) 81 try: 82 self.recheck_session(request) 83 except SessionBindingBroken as exc: 84 LOGGER.warning("Session binding broken", exc=exc) 85 # At this point, we need to logout the current user 86 # however since this middleware has to run before the `AuthenticationMiddleware` 87 # we don't have access to the user yet 88 # Logout will still work, however event logs won't display the user being logged out 89 AuthenticationMiddleware(lambda request: request).process_request(request) 90 logout_extra(request, exc) 91 request.session.clear() 92 return redirect_to_login(request.get_full_path()) 93 return None
def
recheck_session(self, request: django.http.request.HttpRequest):
95 def recheck_session(self, request: HttpRequest): 96 """Check if a session is still valid with a changed IP""" 97 last_ip = request.session.get(Session.Keys.LAST_IP) 98 new_ip = ClientIPMiddleware.get_client_ip(request) 99 # Check changed IP 100 if new_ip == last_ip: 101 return 102 configured_binding_net = request.session.get( 103 SESSION_KEY_BINDING_NET, NetworkBinding.NO_BINDING 104 ) 105 configured_binding_geo = request.session.get( 106 SESSION_KEY_BINDING_GEO, GeoIPBinding.NO_BINDING 107 ) 108 if configured_binding_net != NetworkBinding.NO_BINDING: 109 BoundSessionMiddleware.recheck_session_net(configured_binding_net, last_ip, new_ip) 110 if configured_binding_geo != GeoIPBinding.NO_BINDING: 111 BoundSessionMiddleware.recheck_session_geo(configured_binding_geo, last_ip, new_ip) 112 # If we got to this point without any error being raised, we need to 113 # update the last saved IP to the current one 114 if SESSION_KEY_BINDING_NET in request.session or SESSION_KEY_BINDING_GEO in request.session: 115 # Only set the last IP in the session if there's a binding specified 116 # (== basically requires the user to be logged in) 117 request.session[Session.Keys.LAST_IP] = new_ip
Check if a session is still valid with a changed IP
@staticmethod
def
recheck_session_net( binding: authentik.stages.user_login.models.NetworkBinding, last_ip: str, new_ip: str):
119 @staticmethod 120 def recheck_session_net(binding: NetworkBinding, last_ip: str, new_ip: str): 121 """Check network/ASN binding""" 122 last_asn = ASN_CONTEXT_PROCESSOR.asn(last_ip) 123 new_asn = ASN_CONTEXT_PROCESSOR.asn(new_ip) 124 if not last_asn or not new_asn: 125 raise SessionBindingBroken( 126 "network.missing", 127 ASN_CONTEXT_PROCESSOR.asn_to_dict(last_asn), 128 ASN_CONTEXT_PROCESSOR.asn_to_dict(new_asn), 129 last_ip, 130 new_ip, 131 ) 132 if binding in [ 133 NetworkBinding.BIND_ASN, 134 NetworkBinding.BIND_ASN_NETWORK, 135 NetworkBinding.BIND_ASN_NETWORK_IP, 136 ]: 137 # Check ASN which is required for all 3 modes 138 if last_asn.autonomous_system_number != new_asn.autonomous_system_number: 139 raise SessionBindingBroken( 140 "network.asn", 141 last_asn.autonomous_system_number, 142 new_asn.autonomous_system_number, 143 last_ip, 144 new_ip, 145 ) 146 if binding in [NetworkBinding.BIND_ASN_NETWORK, NetworkBinding.BIND_ASN_NETWORK_IP]: 147 # Check Network afterwards 148 if last_asn.network != new_asn.network: 149 raise SessionBindingBroken( 150 "network.asn_network", 151 str(last_asn.network), 152 str(new_asn.network), 153 last_ip, 154 new_ip, 155 ) 156 if binding in [NetworkBinding.BIND_ASN_NETWORK_IP]: 157 # Only require strict IP checking 158 if last_ip != new_ip: 159 raise SessionBindingBroken( 160 "network.ip", 161 last_ip, 162 new_ip, 163 last_ip, 164 new_ip, 165 )
Check network/ASN binding
@staticmethod
def
recheck_session_geo( binding: authentik.stages.user_login.models.GeoIPBinding, last_ip: str, new_ip: str):
167 @staticmethod 168 def recheck_session_geo(binding: GeoIPBinding, last_ip: str, new_ip: str): 169 """Check GeoIP binding""" 170 last_geo = GEOIP_CONTEXT_PROCESSOR.city(last_ip) 171 new_geo = GEOIP_CONTEXT_PROCESSOR.city(new_ip) 172 if not last_geo or not new_geo: 173 raise SessionBindingBroken( 174 "geoip.missing", 175 GEOIP_CONTEXT_PROCESSOR.city_to_dict(last_geo), 176 GEOIP_CONTEXT_PROCESSOR.city_to_dict(new_geo), 177 last_ip, 178 new_ip, 179 ) 180 if binding in [ 181 GeoIPBinding.BIND_CONTINENT, 182 GeoIPBinding.BIND_CONTINENT_COUNTRY, 183 GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, 184 ]: 185 # Check Continent which is required for all 3 modes 186 if last_geo.continent != new_geo.continent: 187 raise SessionBindingBroken( 188 "geoip.continent", 189 last_geo.continent.to_dict(), 190 new_geo.continent.to_dict(), 191 last_ip, 192 new_ip, 193 ) 194 if binding in [ 195 GeoIPBinding.BIND_CONTINENT_COUNTRY, 196 GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY, 197 ]: 198 # Check Country afterwards 199 if last_geo.country != new_geo.country: 200 raise SessionBindingBroken( 201 "geoip.country", 202 last_geo.country.to_dict(), 203 new_geo.country.to_dict(), 204 last_ip, 205 new_ip, 206 ) 207 if binding in [GeoIPBinding.BIND_CONTINENT_COUNTRY_CITY]: 208 # Check city afterwards 209 if last_geo.city != new_geo.city: 210 raise SessionBindingBroken( 211 "geoip.city", 212 last_geo.city.to_dict(), 213 new_geo.city.to_dict(), 214 last_ip, 215 new_ip, 216 )
Check GeoIP binding