authentik.core.tests.test_source_flow_manager

Test Source flow_manager

  1"""Test Source flow_manager"""
  2
  3from django.contrib.auth.models import AnonymousUser
  4from django.test import TestCase
  5from django.urls import reverse
  6from guardian.shortcuts import get_anonymous_user
  7
  8from authentik.core.models import SourceUserMatchingModes, User
  9from authentik.core.sources.flow_manager import Action
 10from authentik.core.sources.stage import PostSourceStage
 11from authentik.core.tests.utils import RequestFactory, create_test_flow
 12from authentik.flows.planner import FlowPlan
 13from authentik.flows.views.executor import SESSION_KEY_PLAN
 14from authentik.lib.generators import generate_id
 15from authentik.policies.denied import AccessDeniedResponse
 16from authentik.policies.expression.models import ExpressionPolicy
 17from authentik.policies.models import PolicyBinding
 18from authentik.sources.oauth.models import OAuthSource, UserOAuthSourceConnection
 19from authentik.sources.oauth.views.callback import OAuthSourceFlowManager
 20
 21
 22class TestSourceFlowManager(TestCase):
 23    """Test Source flow_manager"""
 24
 25    def setUp(self) -> None:
 26        super().setUp()
 27        self.authentication_flow = create_test_flow()
 28        self.enrollment_flow = create_test_flow()
 29        self.source: OAuthSource = OAuthSource.objects.create(
 30            name=generate_id(),
 31            slug=generate_id(),
 32            authentication_flow=self.authentication_flow,
 33            enrollment_flow=self.enrollment_flow,
 34        )
 35        self.identifier = generate_id()
 36        self.request_factory = RequestFactory()
 37
 38    def test_unauthenticated_enroll(self):
 39        """Test un-authenticated user enrolling"""
 40        request = self.request_factory.get("/", user=AnonymousUser())
 41        flow_manager = OAuthSourceFlowManager(
 42            self.source, request, self.identifier, {"info": {}}, {}
 43        )
 44        action, _ = flow_manager.get_action()
 45        self.assertEqual(action, Action.ENROLL)
 46        response = flow_manager.get_flow()
 47        self.assertEqual(response.status_code, 302)
 48        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
 49        self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)
 50
 51    def test_unauthenticated_auth(self):
 52        """Test un-authenticated user authenticating"""
 53        UserOAuthSourceConnection.objects.create(
 54            user=get_anonymous_user(), source=self.source, identifier=self.identifier
 55        )
 56        request = self.request_factory.get("/", user=AnonymousUser())
 57        flow_manager = OAuthSourceFlowManager(
 58            self.source, request, self.identifier, {"info": {}}, {}
 59        )
 60        action, _ = flow_manager.get_action()
 61        self.assertEqual(action, Action.AUTH)
 62        response = flow_manager.get_flow()
 63        self.assertEqual(response.status_code, 302)
 64        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
 65        self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)
 66
 67    def test_authenticated_link(self):
 68        """Test authenticated user linking"""
 69        user = User.objects.create(username="foo", email="foo@bar.baz")
 70        request = self.request_factory.get("/", user=user)
 71        flow_manager = OAuthSourceFlowManager(
 72            self.source, request, self.identifier, {"info": {}}, {}
 73        )
 74        action, connection = flow_manager.get_action()
 75        self.assertEqual(action, Action.LINK)
 76        self.assertIsNone(connection.pk)
 77        response = flow_manager.get_flow()
 78        self.assertEqual(response.status_code, 302)
 79        self.assertEqual(
 80            response.url,
 81            reverse("authentik_core:if-user") + "#/settings;page-sources",
 82        )
 83
 84    def test_authenticated_auth(self):
 85        """Test authenticated user linking"""
 86        user = User.objects.create(username="foo", email="foo@bar.baz")
 87        UserOAuthSourceConnection.objects.create(
 88            user=user, source=self.source, identifier=self.identifier
 89        )
 90        request = self.request_factory.get("/", user=user)
 91        flow_manager = OAuthSourceFlowManager(
 92            self.source, request, self.identifier, {"info": {}}, {}
 93        )
 94        action, connection = flow_manager.get_action()
 95        self.assertEqual(action, Action.AUTH)
 96        self.assertIsNotNone(connection.pk)
 97        response = flow_manager.get_flow()
 98        self.assertEqual(response.status_code, 302)
 99
100    def test_unauthenticated_link(self):
101        """Test un-authenticated user linking"""
102        flow_manager = OAuthSourceFlowManager(
103            self.source,
104            self.request_factory.get("/", user=get_anonymous_user()),
105            self.identifier,
106            {"info": {}},
107            {},
108        )
109        action, connection = flow_manager.get_action()
110        self.assertEqual(action, Action.LINK)
111        self.assertIsNone(connection.pk)
112        flow_manager.get_flow()
113
114    def test_unauthenticated_enroll_email(self):
115        """Test un-authenticated user enrolling (link on email)"""
116        User.objects.create(username="foo", email="foo@bar.baz")
117        self.source.user_matching_mode = SourceUserMatchingModes.EMAIL_LINK
118
119        # Without email, deny
120        flow_manager = OAuthSourceFlowManager(
121            self.source,
122            self.request_factory.get("/", user=AnonymousUser()),
123            self.identifier,
124            {"info": {}},
125            {},
126        )
127        action, _ = flow_manager.get_action()
128        self.assertEqual(action, Action.DENY)
129        flow_manager.get_flow()
130        # With email
131        flow_manager = OAuthSourceFlowManager(
132            self.source,
133            self.request_factory.get("/", user=AnonymousUser()),
134            self.identifier,
135            {
136                "info": {
137                    "email": "foo@bar.baz",
138                },
139            },
140            {},
141        )
142        action, _ = flow_manager.get_action()
143        self.assertEqual(action, Action.LINK)
144        flow_manager.get_flow()
145
146    def test_unauthenticated_enroll_username(self):
147        """Test un-authenticated user enrolling (link on username)"""
148        User.objects.create(username="foo", email="foo@bar.baz")
149        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
150
151        # Without username, deny
152        flow_manager = OAuthSourceFlowManager(
153            self.source,
154            self.request_factory.get("/", user=AnonymousUser()),
155            self.identifier,
156            {"info": {}},
157            {},
158        )
159        action, _ = flow_manager.get_action()
160        self.assertEqual(action, Action.DENY)
161        flow_manager.get_flow()
162        # With username
163        flow_manager = OAuthSourceFlowManager(
164            self.source,
165            self.request_factory.get("/", user=AnonymousUser()),
166            self.identifier,
167            {
168                "info": {"username": "foo"},
169            },
170            {},
171        )
172        action, _ = flow_manager.get_action()
173        self.assertEqual(action, Action.LINK)
174        flow_manager.get_flow()
175
176    def test_unauthenticated_enroll_username_deny(self):
177        """Test un-authenticated user enrolling (deny on username)"""
178        User.objects.create(username="foo", email="foo@bar.baz")
179        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_DENY
180
181        # With non-existent username, enroll
182        flow_manager = OAuthSourceFlowManager(
183            self.source,
184            self.request_factory.get("/", user=AnonymousUser()),
185            self.identifier,
186            {
187                "info": {
188                    "username": "bar",
189                },
190            },
191            {},
192        )
193        action, _ = flow_manager.get_action()
194        self.assertEqual(action, Action.ENROLL)
195        flow_manager.get_flow()
196        # With username
197        flow_manager = OAuthSourceFlowManager(
198            self.source,
199            self.request_factory.get("/", user=AnonymousUser()),
200            self.identifier,
201            {
202                "info": {"username": "foo"},
203            },
204            {},
205        )
206        action, _ = flow_manager.get_action()
207        self.assertEqual(action, Action.DENY)
208        flow_manager.get_flow()
209
210    def test_unauthenticated_enroll_link_non_existent(self):
211        """Test un-authenticated user enrolling (link on username), username doesn't exist"""
212        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
213
214        flow_manager = OAuthSourceFlowManager(
215            self.source,
216            self.request_factory.get("/", user=AnonymousUser()),
217            self.identifier,
218            {
219                "info": {"username": "foo"},
220            },
221            {},
222        )
223        action, _ = flow_manager.get_action()
224        self.assertEqual(action, Action.ENROLL)
225        flow_manager.get_flow()
226
227    def test_error_non_applicable_flow(self):
228        """Test error handling when a source selected flow is non-applicable due to a policy"""
229        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
230
231        flow = create_test_flow()
232        policy = ExpressionPolicy.objects.create(
233            name="false", expression="""ak_message("foo");return False"""
234        )
235        PolicyBinding.objects.create(
236            policy=policy,
237            target=flow,
238            order=0,
239        )
240        self.source.enrollment_flow = flow
241        self.source.save()
242
243        flow_manager = OAuthSourceFlowManager(
244            self.source,
245            self.request_factory.get("/", user=AnonymousUser()),
246            self.identifier,
247            {
248                "info": {"username": "foo"},
249            },
250            {},
251        )
252        action, _ = flow_manager.get_action()
253        self.assertEqual(action, Action.ENROLL)
254        response = flow_manager.get_flow()
255        self.assertIsInstance(response, AccessDeniedResponse)
256
257        self.assertEqual(response.error_message, "foo")
class TestSourceFlowManager(django.test.testcases.TestCase):
 23class TestSourceFlowManager(TestCase):
 24    """Test Source flow_manager"""
 25
 26    def setUp(self) -> None:
 27        super().setUp()
 28        self.authentication_flow = create_test_flow()
 29        self.enrollment_flow = create_test_flow()
 30        self.source: OAuthSource = OAuthSource.objects.create(
 31            name=generate_id(),
 32            slug=generate_id(),
 33            authentication_flow=self.authentication_flow,
 34            enrollment_flow=self.enrollment_flow,
 35        )
 36        self.identifier = generate_id()
 37        self.request_factory = RequestFactory()
 38
 39    def test_unauthenticated_enroll(self):
 40        """Test un-authenticated user enrolling"""
 41        request = self.request_factory.get("/", user=AnonymousUser())
 42        flow_manager = OAuthSourceFlowManager(
 43            self.source, request, self.identifier, {"info": {}}, {}
 44        )
 45        action, _ = flow_manager.get_action()
 46        self.assertEqual(action, Action.ENROLL)
 47        response = flow_manager.get_flow()
 48        self.assertEqual(response.status_code, 302)
 49        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
 50        self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)
 51
 52    def test_unauthenticated_auth(self):
 53        """Test un-authenticated user authenticating"""
 54        UserOAuthSourceConnection.objects.create(
 55            user=get_anonymous_user(), source=self.source, identifier=self.identifier
 56        )
 57        request = self.request_factory.get("/", user=AnonymousUser())
 58        flow_manager = OAuthSourceFlowManager(
 59            self.source, request, self.identifier, {"info": {}}, {}
 60        )
 61        action, _ = flow_manager.get_action()
 62        self.assertEqual(action, Action.AUTH)
 63        response = flow_manager.get_flow()
 64        self.assertEqual(response.status_code, 302)
 65        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
 66        self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)
 67
 68    def test_authenticated_link(self):
 69        """Test authenticated user linking"""
 70        user = User.objects.create(username="foo", email="foo@bar.baz")
 71        request = self.request_factory.get("/", user=user)
 72        flow_manager = OAuthSourceFlowManager(
 73            self.source, request, self.identifier, {"info": {}}, {}
 74        )
 75        action, connection = flow_manager.get_action()
 76        self.assertEqual(action, Action.LINK)
 77        self.assertIsNone(connection.pk)
 78        response = flow_manager.get_flow()
 79        self.assertEqual(response.status_code, 302)
 80        self.assertEqual(
 81            response.url,
 82            reverse("authentik_core:if-user") + "#/settings;page-sources",
 83        )
 84
 85    def test_authenticated_auth(self):
 86        """Test authenticated user linking"""
 87        user = User.objects.create(username="foo", email="foo@bar.baz")
 88        UserOAuthSourceConnection.objects.create(
 89            user=user, source=self.source, identifier=self.identifier
 90        )
 91        request = self.request_factory.get("/", user=user)
 92        flow_manager = OAuthSourceFlowManager(
 93            self.source, request, self.identifier, {"info": {}}, {}
 94        )
 95        action, connection = flow_manager.get_action()
 96        self.assertEqual(action, Action.AUTH)
 97        self.assertIsNotNone(connection.pk)
 98        response = flow_manager.get_flow()
 99        self.assertEqual(response.status_code, 302)
100
101    def test_unauthenticated_link(self):
102        """Test un-authenticated user linking"""
103        flow_manager = OAuthSourceFlowManager(
104            self.source,
105            self.request_factory.get("/", user=get_anonymous_user()),
106            self.identifier,
107            {"info": {}},
108            {},
109        )
110        action, connection = flow_manager.get_action()
111        self.assertEqual(action, Action.LINK)
112        self.assertIsNone(connection.pk)
113        flow_manager.get_flow()
114
115    def test_unauthenticated_enroll_email(self):
116        """Test un-authenticated user enrolling (link on email)"""
117        User.objects.create(username="foo", email="foo@bar.baz")
118        self.source.user_matching_mode = SourceUserMatchingModes.EMAIL_LINK
119
120        # Without email, deny
121        flow_manager = OAuthSourceFlowManager(
122            self.source,
123            self.request_factory.get("/", user=AnonymousUser()),
124            self.identifier,
125            {"info": {}},
126            {},
127        )
128        action, _ = flow_manager.get_action()
129        self.assertEqual(action, Action.DENY)
130        flow_manager.get_flow()
131        # With email
132        flow_manager = OAuthSourceFlowManager(
133            self.source,
134            self.request_factory.get("/", user=AnonymousUser()),
135            self.identifier,
136            {
137                "info": {
138                    "email": "foo@bar.baz",
139                },
140            },
141            {},
142        )
143        action, _ = flow_manager.get_action()
144        self.assertEqual(action, Action.LINK)
145        flow_manager.get_flow()
146
147    def test_unauthenticated_enroll_username(self):
148        """Test un-authenticated user enrolling (link on username)"""
149        User.objects.create(username="foo", email="foo@bar.baz")
150        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
151
152        # Without username, deny
153        flow_manager = OAuthSourceFlowManager(
154            self.source,
155            self.request_factory.get("/", user=AnonymousUser()),
156            self.identifier,
157            {"info": {}},
158            {},
159        )
160        action, _ = flow_manager.get_action()
161        self.assertEqual(action, Action.DENY)
162        flow_manager.get_flow()
163        # With username
164        flow_manager = OAuthSourceFlowManager(
165            self.source,
166            self.request_factory.get("/", user=AnonymousUser()),
167            self.identifier,
168            {
169                "info": {"username": "foo"},
170            },
171            {},
172        )
173        action, _ = flow_manager.get_action()
174        self.assertEqual(action, Action.LINK)
175        flow_manager.get_flow()
176
177    def test_unauthenticated_enroll_username_deny(self):
178        """Test un-authenticated user enrolling (deny on username)"""
179        User.objects.create(username="foo", email="foo@bar.baz")
180        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_DENY
181
182        # With non-existent username, enroll
183        flow_manager = OAuthSourceFlowManager(
184            self.source,
185            self.request_factory.get("/", user=AnonymousUser()),
186            self.identifier,
187            {
188                "info": {
189                    "username": "bar",
190                },
191            },
192            {},
193        )
194        action, _ = flow_manager.get_action()
195        self.assertEqual(action, Action.ENROLL)
196        flow_manager.get_flow()
197        # With username
198        flow_manager = OAuthSourceFlowManager(
199            self.source,
200            self.request_factory.get("/", user=AnonymousUser()),
201            self.identifier,
202            {
203                "info": {"username": "foo"},
204            },
205            {},
206        )
207        action, _ = flow_manager.get_action()
208        self.assertEqual(action, Action.DENY)
209        flow_manager.get_flow()
210
211    def test_unauthenticated_enroll_link_non_existent(self):
212        """Test un-authenticated user enrolling (link on username), username doesn't exist"""
213        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
214
215        flow_manager = OAuthSourceFlowManager(
216            self.source,
217            self.request_factory.get("/", user=AnonymousUser()),
218            self.identifier,
219            {
220                "info": {"username": "foo"},
221            },
222            {},
223        )
224        action, _ = flow_manager.get_action()
225        self.assertEqual(action, Action.ENROLL)
226        flow_manager.get_flow()
227
228    def test_error_non_applicable_flow(self):
229        """Test error handling when a source selected flow is non-applicable due to a policy"""
230        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
231
232        flow = create_test_flow()
233        policy = ExpressionPolicy.objects.create(
234            name="false", expression="""ak_message("foo");return False"""
235        )
236        PolicyBinding.objects.create(
237            policy=policy,
238            target=flow,
239            order=0,
240        )
241        self.source.enrollment_flow = flow
242        self.source.save()
243
244        flow_manager = OAuthSourceFlowManager(
245            self.source,
246            self.request_factory.get("/", user=AnonymousUser()),
247            self.identifier,
248            {
249                "info": {"username": "foo"},
250            },
251            {},
252        )
253        action, _ = flow_manager.get_action()
254        self.assertEqual(action, Action.ENROLL)
255        response = flow_manager.get_flow()
256        self.assertIsInstance(response, AccessDeniedResponse)
257
258        self.assertEqual(response.error_message, "foo")

Test Source flow_manager

def setUp(self) -> None:
26    def setUp(self) -> None:
27        super().setUp()
28        self.authentication_flow = create_test_flow()
29        self.enrollment_flow = create_test_flow()
30        self.source: OAuthSource = OAuthSource.objects.create(
31            name=generate_id(),
32            slug=generate_id(),
33            authentication_flow=self.authentication_flow,
34            enrollment_flow=self.enrollment_flow,
35        )
36        self.identifier = generate_id()
37        self.request_factory = RequestFactory()

Hook method for setting up the test fixture before exercising it.

def test_unauthenticated_enroll(self):
39    def test_unauthenticated_enroll(self):
40        """Test un-authenticated user enrolling"""
41        request = self.request_factory.get("/", user=AnonymousUser())
42        flow_manager = OAuthSourceFlowManager(
43            self.source, request, self.identifier, {"info": {}}, {}
44        )
45        action, _ = flow_manager.get_action()
46        self.assertEqual(action, Action.ENROLL)
47        response = flow_manager.get_flow()
48        self.assertEqual(response.status_code, 302)
49        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
50        self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)

Test un-authenticated user enrolling

def test_unauthenticated_auth(self):
52    def test_unauthenticated_auth(self):
53        """Test un-authenticated user authenticating"""
54        UserOAuthSourceConnection.objects.create(
55            user=get_anonymous_user(), source=self.source, identifier=self.identifier
56        )
57        request = self.request_factory.get("/", user=AnonymousUser())
58        flow_manager = OAuthSourceFlowManager(
59            self.source, request, self.identifier, {"info": {}}, {}
60        )
61        action, _ = flow_manager.get_action()
62        self.assertEqual(action, Action.AUTH)
63        response = flow_manager.get_flow()
64        self.assertEqual(response.status_code, 302)
65        flow_plan: FlowPlan = request.session[SESSION_KEY_PLAN]
66        self.assertEqual(flow_plan.bindings[0].stage.view, PostSourceStage)

Test un-authenticated user authenticating

def test_authenticated_auth(self):
85    def test_authenticated_auth(self):
86        """Test authenticated user linking"""
87        user = User.objects.create(username="foo", email="foo@bar.baz")
88        UserOAuthSourceConnection.objects.create(
89            user=user, source=self.source, identifier=self.identifier
90        )
91        request = self.request_factory.get("/", user=user)
92        flow_manager = OAuthSourceFlowManager(
93            self.source, request, self.identifier, {"info": {}}, {}
94        )
95        action, connection = flow_manager.get_action()
96        self.assertEqual(action, Action.AUTH)
97        self.assertIsNotNone(connection.pk)
98        response = flow_manager.get_flow()
99        self.assertEqual(response.status_code, 302)

Test authenticated user linking

def test_unauthenticated_enroll_email(self):
115    def test_unauthenticated_enroll_email(self):
116        """Test un-authenticated user enrolling (link on email)"""
117        User.objects.create(username="foo", email="foo@bar.baz")
118        self.source.user_matching_mode = SourceUserMatchingModes.EMAIL_LINK
119
120        # Without email, deny
121        flow_manager = OAuthSourceFlowManager(
122            self.source,
123            self.request_factory.get("/", user=AnonymousUser()),
124            self.identifier,
125            {"info": {}},
126            {},
127        )
128        action, _ = flow_manager.get_action()
129        self.assertEqual(action, Action.DENY)
130        flow_manager.get_flow()
131        # With email
132        flow_manager = OAuthSourceFlowManager(
133            self.source,
134            self.request_factory.get("/", user=AnonymousUser()),
135            self.identifier,
136            {
137                "info": {
138                    "email": "foo@bar.baz",
139                },
140            },
141            {},
142        )
143        action, _ = flow_manager.get_action()
144        self.assertEqual(action, Action.LINK)
145        flow_manager.get_flow()

Test un-authenticated user enrolling (link on email)

def test_unauthenticated_enroll_username(self):
147    def test_unauthenticated_enroll_username(self):
148        """Test un-authenticated user enrolling (link on username)"""
149        User.objects.create(username="foo", email="foo@bar.baz")
150        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
151
152        # Without username, deny
153        flow_manager = OAuthSourceFlowManager(
154            self.source,
155            self.request_factory.get("/", user=AnonymousUser()),
156            self.identifier,
157            {"info": {}},
158            {},
159        )
160        action, _ = flow_manager.get_action()
161        self.assertEqual(action, Action.DENY)
162        flow_manager.get_flow()
163        # With username
164        flow_manager = OAuthSourceFlowManager(
165            self.source,
166            self.request_factory.get("/", user=AnonymousUser()),
167            self.identifier,
168            {
169                "info": {"username": "foo"},
170            },
171            {},
172        )
173        action, _ = flow_manager.get_action()
174        self.assertEqual(action, Action.LINK)
175        flow_manager.get_flow()

Test un-authenticated user enrolling (link on username)

def test_unauthenticated_enroll_username_deny(self):
177    def test_unauthenticated_enroll_username_deny(self):
178        """Test un-authenticated user enrolling (deny on username)"""
179        User.objects.create(username="foo", email="foo@bar.baz")
180        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_DENY
181
182        # With non-existent username, enroll
183        flow_manager = OAuthSourceFlowManager(
184            self.source,
185            self.request_factory.get("/", user=AnonymousUser()),
186            self.identifier,
187            {
188                "info": {
189                    "username": "bar",
190                },
191            },
192            {},
193        )
194        action, _ = flow_manager.get_action()
195        self.assertEqual(action, Action.ENROLL)
196        flow_manager.get_flow()
197        # With username
198        flow_manager = OAuthSourceFlowManager(
199            self.source,
200            self.request_factory.get("/", user=AnonymousUser()),
201            self.identifier,
202            {
203                "info": {"username": "foo"},
204            },
205            {},
206        )
207        action, _ = flow_manager.get_action()
208        self.assertEqual(action, Action.DENY)
209        flow_manager.get_flow()

Test un-authenticated user enrolling (deny on username)

def test_error_non_applicable_flow(self):
228    def test_error_non_applicable_flow(self):
229        """Test error handling when a source selected flow is non-applicable due to a policy"""
230        self.source.user_matching_mode = SourceUserMatchingModes.USERNAME_LINK
231
232        flow = create_test_flow()
233        policy = ExpressionPolicy.objects.create(
234            name="false", expression="""ak_message("foo");return False"""
235        )
236        PolicyBinding.objects.create(
237            policy=policy,
238            target=flow,
239            order=0,
240        )
241        self.source.enrollment_flow = flow
242        self.source.save()
243
244        flow_manager = OAuthSourceFlowManager(
245            self.source,
246            self.request_factory.get("/", user=AnonymousUser()),
247            self.identifier,
248            {
249                "info": {"username": "foo"},
250            },
251            {},
252        )
253        action, _ = flow_manager.get_action()
254        self.assertEqual(action, Action.ENROLL)
255        response = flow_manager.get_flow()
256        self.assertIsInstance(response, AccessDeniedResponse)
257
258        self.assertEqual(response.error_message, "foo")

Test error handling when a source selected flow is non-applicable due to a policy