authentik.core.tests.test_source_flow_manager
Test Source flow_manager
1"""Test Source flow_manager""" 2 3from django.contrib.auth.models import AnonymousUser 4from django.test import TestCase 5from django.urls import reverse 6from guardian.shortcuts import get_anonymous_user 7 8from authentik.core.models import SourceUserMatchingModes, User 9from authentik.core.sources.flow_manager import Action 10from authentik.core.sources.stage import PostSourceStage 11from authentik.core.tests.utils import RequestFactory, create_test_flow 12from authentik.flows.planner import FlowPlan 13from authentik.flows.views.executor import SESSION_KEY_PLAN 14from authentik.lib.generators import generate_id 15from authentik.policies.denied import AccessDeniedResponse 16from authentik.policies.expression.models import ExpressionPolicy 17from authentik.policies.models import PolicyBinding 18from authentik.sources.oauth.models import OAuthSource, UserOAuthSourceConnection 19from authentik.sources.oauth.views.callback import OAuthSourceFlowManager 20 21 22class TestSourceFlowManager(TestCase): 23 """Test Source flow_manager""" 24 25 def setUp(self) -> None: 26 super().setUp() 27 self.authentication_flow = create_test_flow() 28 self.enrollment_flow = create_test_flow() 29 self.source: OAuthSource = OAuthSource.objects.create( 30 name=generate_id(), 31 slug=generate_id(), 32 authentication_flow=self.authentication_flow, 33 enrollment_flow=self.enrollment_flow, 34 ) 35 self.identifier = generate_id() 36 self.request_factory = RequestFactory() 37 38 def test_unauthenticated_enroll(self): 39 """Test un-authenticated user enrolling""" 40 request = self.request_factory.get("/", user=AnonymousUser()) 41 flow_manager = OAuthSourceFlowManager( 42 self.source, request, self.identifier, {"info": {}}, {} 43 ) 44 action, _ = flow_manager.get_action() 45 self.assertEqual(action, Action.ENROLL) 46 response = flow_manager.get_flow() 47 self.assertEqual(response.status_code, 302) 48 flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 49 self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage) 50 51 def test_unauthenticated_auth(self): 52 """Test un-authenticated user authenticating""" 53 UserOAuthSourceConnection.objects.create( 54 user=get_anonymous_user(), source=self.source, identifier=self.identifier 55 ) 56 request = self.request_factory.get("/", user=AnonymousUser()) 57 flow_manager = OAuthSourceFlowManager( 58 self.source, request, self.identifier, {"info": {}}, {} 59 ) 60 action, _ = flow_manager.get_action() 61 self.assertEqual(action, Action.AUTH) 62 response = flow_manager.get_flow() 63 self.assertEqual(response.status_code, 302) 64 flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 65 self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage) 66 67 def test_authenticated_link(self): 68 """Test authenticated user linking""" 69 user = User.objects.create(username="foo", email="foo@bar.baz") 70 request = self.request_factory.get("/", user=user) 71 flow_manager = OAuthSourceFlowManager( 72 self.source, request, self.identifier, {"info": {}}, {} 73 ) 74 action, connection = flow_manager.get_action() 75 self.assertEqual(action, Action.LINK) 76 self.assertIsNone(connection.pk) 77 response = flow_manager.get_flow() 78 self.assertEqual(response.status_code, 302) 79 self.assertEqual( 80 response.url, 81 reverse("authentik_core:if-user") + "#/settings;page-sources", 82 ) 83 84 def test_authenticated_auth(self): 85 """Test authenticated user linking""" 86 user = User.objects.create(username="foo", email="foo@bar.baz") 87 UserOAuthSourceConnection.objects.create( 88 user=user, source=self.source, identifier=self.identifier 89 ) 90 request = self.request_factory.get("/", user=user) 91 flow_manager = OAuthSourceFlowManager( 92 self.source, request, self.identifier, {"info": {}}, {} 93 ) 94 action, connection = flow_manager.get_action() 95 self.assertEqual(action, Action.AUTH) 96 self.assertIsNotNone(connection.pk) 97 response = flow_manager.get_flow() 98 self.assertEqual(response.status_code, 302) 99 100 def test_unauthenticated_link(self): 101 """Test un-authenticated user linking""" 102 flow_manager = OAuthSourceFlowManager( 103 self.source, 104 self.request_factory.get("/", user=get_anonymous_user()), 105 self.identifier, 106 {"info": {}}, 107 {}, 108 ) 109 action, connection = flow_manager.get_action() 110 self.assertEqual(action, Action.LINK) 111 self.assertIsNone(connection.pk) 112 flow_manager.get_flow() 113 114 def test_unauthenticated_enroll_email(self): 115 """Test un-authenticated user enrolling (link on email)""" 116 User.objects.create(username="foo", email="foo@bar.baz") 117 self.source.user_matching_mode = SourceUserMatchingModes.EMAIL_LINK 118 119 # Without email, deny 120 flow_manager = OAuthSourceFlowManager( 121 self.source, 122 self.request_factory.get("/", user=AnonymousUser()), 123 self.identifier, 124 {"info": {}}, 125 {}, 126 ) 127 action, _ = flow_manager.get_action() 128 self.assertEqual(action, Action.DENY) 129 flow_manager.get_flow() 130 # With email 131 flow_manager = OAuthSourceFlowManager( 132 self.source, 133 self.request_factory.get("/", user=AnonymousUser()), 134 self.identifier, 135 { 136 "info": { 137 "email": "foo@bar.baz", 138 }, 139 }, 140 {}, 141 ) 142 action, _ = flow_manager.get_action() 143 self.assertEqual(action, Action.LINK) 144 flow_manager.get_flow() 145 146 def test_unauthenticated_enroll_username(self): 147 """Test un-authenticated user enrolling (link on username)""" 148 User.objects.create(username="foo", email="foo@bar.baz") 149 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 150 151 # Without username, deny 152 flow_manager = OAuthSourceFlowManager( 153 self.source, 154 self.request_factory.get("/", user=AnonymousUser()), 155 self.identifier, 156 {"info": {}}, 157 {}, 158 ) 159 action, _ = flow_manager.get_action() 160 self.assertEqual(action, Action.DENY) 161 flow_manager.get_flow() 162 # With username 163 flow_manager = OAuthSourceFlowManager( 164 self.source, 165 self.request_factory.get("/", user=AnonymousUser()), 166 self.identifier, 167 { 168 "info": {"username": "foo"}, 169 }, 170 {}, 171 ) 172 action, _ = flow_manager.get_action() 173 self.assertEqual(action, Action.LINK) 174 flow_manager.get_flow() 175 176 def test_unauthenticated_enroll_username_deny(self): 177 """Test un-authenticated user enrolling (deny on username)""" 178 User.objects.create(username="foo", email="foo@bar.baz") 179 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_DENY 180 181 # With non-existent username, enroll 182 flow_manager = OAuthSourceFlowManager( 183 self.source, 184 self.request_factory.get("/", user=AnonymousUser()), 185 self.identifier, 186 { 187 "info": { 188 "username": "bar", 189 }, 190 }, 191 {}, 192 ) 193 action, _ = flow_manager.get_action() 194 self.assertEqual(action, Action.ENROLL) 195 flow_manager.get_flow() 196 # With username 197 flow_manager = OAuthSourceFlowManager( 198 self.source, 199 self.request_factory.get("/", user=AnonymousUser()), 200 self.identifier, 201 { 202 "info": {"username": "foo"}, 203 }, 204 {}, 205 ) 206 action, _ = flow_manager.get_action() 207 self.assertEqual(action, Action.DENY) 208 flow_manager.get_flow() 209 210 def test_unauthenticated_enroll_link_non_existent(self): 211 """Test un-authenticated user enrolling (link on username), username doesn't exist""" 212 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 213 214 flow_manager = OAuthSourceFlowManager( 215 self.source, 216 self.request_factory.get("/", user=AnonymousUser()), 217 self.identifier, 218 { 219 "info": {"username": "foo"}, 220 }, 221 {}, 222 ) 223 action, _ = flow_manager.get_action() 224 self.assertEqual(action, Action.ENROLL) 225 flow_manager.get_flow() 226 227 def test_error_non_applicable_flow(self): 228 """Test error handling when a source selected flow is non-applicable due to a policy""" 229 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 230 231 flow = create_test_flow() 232 policy = ExpressionPolicy.objects.create( 233 name="false", expression="""ak_message("foo");return False""" 234 ) 235 PolicyBinding.objects.create( 236 policy=policy, 237 target=flow, 238 order=0, 239 ) 240 self.source.enrollment_flow = flow 241 self.source.save() 242 243 flow_manager = OAuthSourceFlowManager( 244 self.source, 245 self.request_factory.get("/", user=AnonymousUser()), 246 self.identifier, 247 { 248 "info": {"username": "foo"}, 249 }, 250 {}, 251 ) 252 action, _ = flow_manager.get_action() 253 self.assertEqual(action, Action.ENROLL) 254 response = flow_manager.get_flow() 255 self.assertIsInstance(response, AccessDeniedResponse) 256 257 self.assertEqual(response.error_message, "foo")
class
TestSourceFlowManager(django.test.testcases.TestCase):
23class TestSourceFlowManager(TestCase): 24 """Test Source flow_manager""" 25 26 def setUp(self) -> None: 27 super().setUp() 28 self.authentication_flow = create_test_flow() 29 self.enrollment_flow = create_test_flow() 30 self.source: OAuthSource = OAuthSource.objects.create( 31 name=generate_id(), 32 slug=generate_id(), 33 authentication_flow=self.authentication_flow, 34 enrollment_flow=self.enrollment_flow, 35 ) 36 self.identifier = generate_id() 37 self.request_factory = RequestFactory() 38 39 def test_unauthenticated_enroll(self): 40 """Test un-authenticated user enrolling""" 41 request = self.request_factory.get("/", user=AnonymousUser()) 42 flow_manager = OAuthSourceFlowManager( 43 self.source, request, self.identifier, {"info": {}}, {} 44 ) 45 action, _ = flow_manager.get_action() 46 self.assertEqual(action, Action.ENROLL) 47 response = flow_manager.get_flow() 48 self.assertEqual(response.status_code, 302) 49 flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 50 self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage) 51 52 def test_unauthenticated_auth(self): 53 """Test un-authenticated user authenticating""" 54 UserOAuthSourceConnection.objects.create( 55 user=get_anonymous_user(), source=self.source, identifier=self.identifier 56 ) 57 request = self.request_factory.get("/", user=AnonymousUser()) 58 flow_manager = OAuthSourceFlowManager( 59 self.source, request, self.identifier, {"info": {}}, {} 60 ) 61 action, _ = flow_manager.get_action() 62 self.assertEqual(action, Action.AUTH) 63 response = flow_manager.get_flow() 64 self.assertEqual(response.status_code, 302) 65 flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 66 self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage) 67 68 def test_authenticated_link(self): 69 """Test authenticated user linking""" 70 user = User.objects.create(username="foo", email="foo@bar.baz") 71 request = self.request_factory.get("/", user=user) 72 flow_manager = OAuthSourceFlowManager( 73 self.source, request, self.identifier, {"info": {}}, {} 74 ) 75 action, connection = flow_manager.get_action() 76 self.assertEqual(action, Action.LINK) 77 self.assertIsNone(connection.pk) 78 response = flow_manager.get_flow() 79 self.assertEqual(response.status_code, 302) 80 self.assertEqual( 81 response.url, 82 reverse("authentik_core:if-user") + "#/settings;page-sources", 83 ) 84 85 def test_authenticated_auth(self): 86 """Test authenticated user linking""" 87 user = User.objects.create(username="foo", email="foo@bar.baz") 88 UserOAuthSourceConnection.objects.create( 89 user=user, source=self.source, identifier=self.identifier 90 ) 91 request = self.request_factory.get("/", user=user) 92 flow_manager = OAuthSourceFlowManager( 93 self.source, request, self.identifier, {"info": {}}, {} 94 ) 95 action, connection = flow_manager.get_action() 96 self.assertEqual(action, Action.AUTH) 97 self.assertIsNotNone(connection.pk) 98 response = flow_manager.get_flow() 99 self.assertEqual(response.status_code, 302) 100 101 def test_unauthenticated_link(self): 102 """Test un-authenticated user linking""" 103 flow_manager = OAuthSourceFlowManager( 104 self.source, 105 self.request_factory.get("/", user=get_anonymous_user()), 106 self.identifier, 107 {"info": {}}, 108 {}, 109 ) 110 action, connection = flow_manager.get_action() 111 self.assertEqual(action, Action.LINK) 112 self.assertIsNone(connection.pk) 113 flow_manager.get_flow() 114 115 def test_unauthenticated_enroll_email(self): 116 """Test un-authenticated user enrolling (link on email)""" 117 User.objects.create(username="foo", email="foo@bar.baz") 118 self.source.user_matching_mode = SourceUserMatchingModes.EMAIL_LINK 119 120 # Without email, deny 121 flow_manager = OAuthSourceFlowManager( 122 self.source, 123 self.request_factory.get("/", user=AnonymousUser()), 124 self.identifier, 125 {"info": {}}, 126 {}, 127 ) 128 action, _ = flow_manager.get_action() 129 self.assertEqual(action, Action.DENY) 130 flow_manager.get_flow() 131 # With email 132 flow_manager = OAuthSourceFlowManager( 133 self.source, 134 self.request_factory.get("/", user=AnonymousUser()), 135 self.identifier, 136 { 137 "info": { 138 "email": "foo@bar.baz", 139 }, 140 }, 141 {}, 142 ) 143 action, _ = flow_manager.get_action() 144 self.assertEqual(action, Action.LINK) 145 flow_manager.get_flow() 146 147 def test_unauthenticated_enroll_username(self): 148 """Test un-authenticated user enrolling (link on username)""" 149 User.objects.create(username="foo", email="foo@bar.baz") 150 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 151 152 # Without username, deny 153 flow_manager = OAuthSourceFlowManager( 154 self.source, 155 self.request_factory.get("/", user=AnonymousUser()), 156 self.identifier, 157 {"info": {}}, 158 {}, 159 ) 160 action, _ = flow_manager.get_action() 161 self.assertEqual(action, Action.DENY) 162 flow_manager.get_flow() 163 # With username 164 flow_manager = OAuthSourceFlowManager( 165 self.source, 166 self.request_factory.get("/", user=AnonymousUser()), 167 self.identifier, 168 { 169 "info": {"username": "foo"}, 170 }, 171 {}, 172 ) 173 action, _ = flow_manager.get_action() 174 self.assertEqual(action, Action.LINK) 175 flow_manager.get_flow() 176 177 def test_unauthenticated_enroll_username_deny(self): 178 """Test un-authenticated user enrolling (deny on username)""" 179 User.objects.create(username="foo", email="foo@bar.baz") 180 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_DENY 181 182 # With non-existent username, enroll 183 flow_manager = OAuthSourceFlowManager( 184 self.source, 185 self.request_factory.get("/", user=AnonymousUser()), 186 self.identifier, 187 { 188 "info": { 189 "username": "bar", 190 }, 191 }, 192 {}, 193 ) 194 action, _ = flow_manager.get_action() 195 self.assertEqual(action, Action.ENROLL) 196 flow_manager.get_flow() 197 # With username 198 flow_manager = OAuthSourceFlowManager( 199 self.source, 200 self.request_factory.get("/", user=AnonymousUser()), 201 self.identifier, 202 { 203 "info": {"username": "foo"}, 204 }, 205 {}, 206 ) 207 action, _ = flow_manager.get_action() 208 self.assertEqual(action, Action.DENY) 209 flow_manager.get_flow() 210 211 def test_unauthenticated_enroll_link_non_existent(self): 212 """Test un-authenticated user enrolling (link on username), username doesn't exist""" 213 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 214 215 flow_manager = OAuthSourceFlowManager( 216 self.source, 217 self.request_factory.get("/", user=AnonymousUser()), 218 self.identifier, 219 { 220 "info": {"username": "foo"}, 221 }, 222 {}, 223 ) 224 action, _ = flow_manager.get_action() 225 self.assertEqual(action, Action.ENROLL) 226 flow_manager.get_flow() 227 228 def test_error_non_applicable_flow(self): 229 """Test error handling when a source selected flow is non-applicable due to a policy""" 230 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 231 232 flow = create_test_flow() 233 policy = ExpressionPolicy.objects.create( 234 name="false", expression="""ak_message("foo");return False""" 235 ) 236 PolicyBinding.objects.create( 237 policy=policy, 238 target=flow, 239 order=0, 240 ) 241 self.source.enrollment_flow = flow 242 self.source.save() 243 244 flow_manager = OAuthSourceFlowManager( 245 self.source, 246 self.request_factory.get("/", user=AnonymousUser()), 247 self.identifier, 248 { 249 "info": {"username": "foo"}, 250 }, 251 {}, 252 ) 253 action, _ = flow_manager.get_action() 254 self.assertEqual(action, Action.ENROLL) 255 response = flow_manager.get_flow() 256 self.assertIsInstance(response, AccessDeniedResponse) 257 258 self.assertEqual(response.error_message, "foo")
Test Source flow_manager
def
setUp(self) -> None:
26 def setUp(self) -> None: 27 super().setUp() 28 self.authentication_flow = create_test_flow() 29 self.enrollment_flow = create_test_flow() 30 self.source: OAuthSource = OAuthSource.objects.create( 31 name=generate_id(), 32 slug=generate_id(), 33 authentication_flow=self.authentication_flow, 34 enrollment_flow=self.enrollment_flow, 35 ) 36 self.identifier = generate_id() 37 self.request_factory = RequestFactory()
Hook method for setting up the test fixture before exercising it.
def
test_unauthenticated_enroll(self):
39 def test_unauthenticated_enroll(self): 40 """Test un-authenticated user enrolling""" 41 request = self.request_factory.get("/", user=AnonymousUser()) 42 flow_manager = OAuthSourceFlowManager( 43 self.source, request, self.identifier, {"info": {}}, {} 44 ) 45 action, _ = flow_manager.get_action() 46 self.assertEqual(action, Action.ENROLL) 47 response = flow_manager.get_flow() 48 self.assertEqual(response.status_code, 302) 49 flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 50 self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)
Test un-authenticated user enrolling
def
test_unauthenticated_auth(self):
52 def test_unauthenticated_auth(self): 53 """Test un-authenticated user authenticating""" 54 UserOAuthSourceConnection.objects.create( 55 user=get_anonymous_user(), source=self.source, identifier=self.identifier 56 ) 57 request = self.request_factory.get("/", user=AnonymousUser()) 58 flow_manager = OAuthSourceFlowManager( 59 self.source, request, self.identifier, {"info": {}}, {} 60 ) 61 action, _ = flow_manager.get_action() 62 self.assertEqual(action, Action.AUTH) 63 response = flow_manager.get_flow() 64 self.assertEqual(response.status_code, 302) 65 flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN] 66 self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)
Test un-authenticated user authenticating
def
test_authenticated_link(self):
68 def test_authenticated_link(self): 69 """Test authenticated user linking""" 70 user = User.objects.create(username="foo", email="foo@bar.baz") 71 request = self.request_factory.get("/", user=user) 72 flow_manager = OAuthSourceFlowManager( 73 self.source, request, self.identifier, {"info": {}}, {} 74 ) 75 action, connection = flow_manager.get_action() 76 self.assertEqual(action, Action.LINK) 77 self.assertIsNone(connection.pk) 78 response = flow_manager.get_flow() 79 self.assertEqual(response.status_code, 302) 80 self.assertEqual( 81 response.url, 82 reverse("authentik_core:if-user") + "#/settings;page-sources", 83 )
Test authenticated user linking
def
test_authenticated_auth(self):
85 def test_authenticated_auth(self): 86 """Test authenticated user linking""" 87 user = User.objects.create(username="foo", email="foo@bar.baz") 88 UserOAuthSourceConnection.objects.create( 89 user=user, source=self.source, identifier=self.identifier 90 ) 91 request = self.request_factory.get("/", user=user) 92 flow_manager = OAuthSourceFlowManager( 93 self.source, request, self.identifier, {"info": {}}, {} 94 ) 95 action, connection = flow_manager.get_action() 96 self.assertEqual(action, Action.AUTH) 97 self.assertIsNotNone(connection.pk) 98 response = flow_manager.get_flow() 99 self.assertEqual(response.status_code, 302)
Test authenticated user linking
def
test_unauthenticated_link(self):
101 def test_unauthenticated_link(self): 102 """Test un-authenticated user linking""" 103 flow_manager = OAuthSourceFlowManager( 104 self.source, 105 self.request_factory.get("/", user=get_anonymous_user()), 106 self.identifier, 107 {"info": {}}, 108 {}, 109 ) 110 action, connection = flow_manager.get_action() 111 self.assertEqual(action, Action.LINK) 112 self.assertIsNone(connection.pk) 113 flow_manager.get_flow()
Test un-authenticated user linking
def
test_unauthenticated_enroll_email(self):
115 def test_unauthenticated_enroll_email(self): 116 """Test un-authenticated user enrolling (link on email)""" 117 User.objects.create(username="foo", email="foo@bar.baz") 118 self.source.user_matching_mode = SourceUserMatchingModes.EMAIL_LINK 119 120 # Without email, deny 121 flow_manager = OAuthSourceFlowManager( 122 self.source, 123 self.request_factory.get("/", user=AnonymousUser()), 124 self.identifier, 125 {"info": {}}, 126 {}, 127 ) 128 action, _ = flow_manager.get_action() 129 self.assertEqual(action, Action.DENY) 130 flow_manager.get_flow() 131 # With email 132 flow_manager = OAuthSourceFlowManager( 133 self.source, 134 self.request_factory.get("/", user=AnonymousUser()), 135 self.identifier, 136 { 137 "info": { 138 "email": "foo@bar.baz", 139 }, 140 }, 141 {}, 142 ) 143 action, _ = flow_manager.get_action() 144 self.assertEqual(action, Action.LINK) 145 flow_manager.get_flow()
Test un-authenticated user enrolling (link on email)
def
test_unauthenticated_enroll_username(self):
147 def test_unauthenticated_enroll_username(self): 148 """Test un-authenticated user enrolling (link on username)""" 149 User.objects.create(username="foo", email="foo@bar.baz") 150 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 151 152 # Without username, deny 153 flow_manager = OAuthSourceFlowManager( 154 self.source, 155 self.request_factory.get("/", user=AnonymousUser()), 156 self.identifier, 157 {"info": {}}, 158 {}, 159 ) 160 action, _ = flow_manager.get_action() 161 self.assertEqual(action, Action.DENY) 162 flow_manager.get_flow() 163 # With username 164 flow_manager = OAuthSourceFlowManager( 165 self.source, 166 self.request_factory.get("/", user=AnonymousUser()), 167 self.identifier, 168 { 169 "info": {"username": "foo"}, 170 }, 171 {}, 172 ) 173 action, _ = flow_manager.get_action() 174 self.assertEqual(action, Action.LINK) 175 flow_manager.get_flow()
Test un-authenticated user enrolling (link on username)
def
test_unauthenticated_enroll_username_deny(self):
177 def test_unauthenticated_enroll_username_deny(self): 178 """Test un-authenticated user enrolling (deny on username)""" 179 User.objects.create(username="foo", email="foo@bar.baz") 180 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_DENY 181 182 # With non-existent username, enroll 183 flow_manager = OAuthSourceFlowManager( 184 self.source, 185 self.request_factory.get("/", user=AnonymousUser()), 186 self.identifier, 187 { 188 "info": { 189 "username": "bar", 190 }, 191 }, 192 {}, 193 ) 194 action, _ = flow_manager.get_action() 195 self.assertEqual(action, Action.ENROLL) 196 flow_manager.get_flow() 197 # With username 198 flow_manager = OAuthSourceFlowManager( 199 self.source, 200 self.request_factory.get("/", user=AnonymousUser()), 201 self.identifier, 202 { 203 "info": {"username": "foo"}, 204 }, 205 {}, 206 ) 207 action, _ = flow_manager.get_action() 208 self.assertEqual(action, Action.DENY) 209 flow_manager.get_flow()
Test un-authenticated user enrolling (deny on username)
def
test_unauthenticated_enroll_link_non_existent(self):
211 def test_unauthenticated_enroll_link_non_existent(self): 212 """Test un-authenticated user enrolling (link on username), username doesn't exist""" 213 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 214 215 flow_manager = OAuthSourceFlowManager( 216 self.source, 217 self.request_factory.get("/", user=AnonymousUser()), 218 self.identifier, 219 { 220 "info": {"username": "foo"}, 221 }, 222 {}, 223 ) 224 action, _ = flow_manager.get_action() 225 self.assertEqual(action, Action.ENROLL) 226 flow_manager.get_flow()
Test un-authenticated user enrolling (link on username), username doesn't exist
def
test_error_non_applicable_flow(self):
228 def test_error_non_applicable_flow(self): 229 """Test error handling when a source selected flow is non-applicable due to a policy""" 230 self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK 231 232 flow = create_test_flow() 233 policy = ExpressionPolicy.objects.create( 234 name="false", expression="""ak_message("foo");return False""" 235 ) 236 PolicyBinding.objects.create( 237 policy=policy, 238 target=flow, 239 order=0, 240 ) 241 self.source.enrollment_flow = flow 242 self.source.save() 243 244 flow_manager = OAuthSourceFlowManager( 245 self.source, 246 self.request_factory.get("/", user=AnonymousUser()), 247 self.identifier, 248 { 249 "info": {"username": "foo"}, 250 }, 251 {}, 252 ) 253 action, _ = flow_manager.get_action() 254 self.assertEqual(action, Action.ENROLL) 255 response = flow_manager.get_flow() 256 self.assertIsInstance(response, AccessDeniedResponse) 257 258 self.assertEqual(response.error_message, "foo")
Test error handling when a source selected flow is non-applicable due to a policy