authentik.enterprise.providers.ssf.tests.test_stream

  1from uuid import uuid4
  2
  3from django.urls import reverse
  4from rest_framework.test import APITestCase
  5
  6from authentik.core.models import Application
  7from authentik.core.tests.utils import create_test_cert
  8from authentik.enterprise.providers.ssf.models import (
  9    SSFEventStatus,
 10    SSFProvider,
 11    Stream,
 12    StreamEvent,
 13    StreamStatus,
 14)
 15from authentik.lib.generators import generate_id
 16
 17
 18class TestStream(APITestCase):
 19    def setUp(self):
 20        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 21        self.provider = SSFProvider.objects.create(
 22            name=generate_id(),
 23            signing_key=create_test_cert(),
 24            backchannel_application=self.application,
 25        )
 26
 27    def test_stream_add_token(self):
 28        """test stream add (token auth)"""
 29        res = self.client.post(
 30            reverse(
 31                "authentik_providers_ssf:stream",
 32                kwargs={"application_slug": self.application.slug},
 33            ),
 34            data={
 35                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 36                "aud": ["https://app.authentik.company"],
 37                "delivery": {
 38                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 39                    "endpoint_url": "https://app.authentik.company",
 40                },
 41                "events_requested": [
 42                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 43                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 44                ],
 45                "format": "iss_sub",
 46            },
 47            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 48        )
 49        self.assertEqual(res.status_code, 201)
 50        stream = Stream.objects.filter(provider=self.provider).first()
 51        self.assertIsNotNone(stream)
 52        event = StreamEvent.objects.filter(stream=stream).first()
 53        self.assertIsNotNone(event)
 54        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 55        self.assertEqual(
 56            event.payload["events"],
 57            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
 58        )
 59
 60    def test_stream_add_poll(self):
 61        """test stream add - poll method"""
 62        res = self.client.post(
 63            reverse(
 64                "authentik_providers_ssf:stream",
 65                kwargs={"application_slug": self.application.slug},
 66            ),
 67            data={
 68                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 69                "aud": ["https://app.authentik.company"],
 70                "delivery": {
 71                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/poll",
 72                },
 73                "events_requested": [
 74                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 75                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 76                ],
 77                "format": "iss_sub",
 78            },
 79            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 80        )
 81        self.assertEqual(res.status_code, 400)
 82        self.assertJSONEqual(
 83            res.content,
 84            {"delivery": {"method": ["Polling for SSF events is not currently supported."]}},
 85        )
 86
 87    def test_stream_delete(self):
 88        """delete stream"""
 89        stream = Stream.objects.create(provider=self.provider)
 90        res = self.client.delete(
 91            reverse(
 92                "authentik_providers_ssf:stream",
 93                kwargs={"application_slug": self.application.slug},
 94            ),
 95            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 96        )
 97        self.assertEqual(res.status_code, 204)
 98        stream.refresh_from_db()
 99        self.assertEqual(stream.status, StreamStatus.DISABLED)
100
101    def test_stream_get(self):
102        """get stream"""
103        Stream.objects.create(provider=self.provider)
104        res = self.client.get(
105            reverse(
106                "authentik_providers_ssf:stream",
107                kwargs={"application_slug": self.application.slug},
108            ),
109            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
110        )
111        self.assertEqual(res.status_code, 200)
112
113    def test_stream_get_filter_query(self):
114        """get stream"""
115        other_stream = Stream.objects.create(provider=self.provider)
116        stream = Stream.objects.create(provider=self.provider)
117        res = self.client.get(
118            reverse(
119                "authentik_providers_ssf:stream",
120                kwargs={"application_slug": self.application.slug},
121            )
122            + f"?stream_id={stream.pk}",
123            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
124        )
125        self.assertEqual(res.status_code, 200)
126        self.assertIn(str(stream.pk), res.content.decode())
127        self.assertNotIn(str(other_stream.pk), res.content.decode())
128
129    def test_stream_patch(self):
130        """patch stream"""
131        other_stream = Stream.objects.create(provider=self.provider)
132        stream = Stream.objects.create(provider=self.provider)
133        res = self.client.patch(
134            reverse(
135                "authentik_providers_ssf:stream",
136                kwargs={"application_slug": self.application.slug},
137            ),
138            data={
139                "delivery": {"endpoint_url": "https://localhost"},
140                "stream_id": str(stream.pk),
141            },
142            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
143        )
144        self.assertEqual(res.status_code, 200)
145        self.assertIn(str(stream.pk), res.content.decode())
146        self.assertNotIn(str(other_stream.pk), res.content.decode())
147
148    def test_stream_put(self):
149        """put stream"""
150        stream = Stream.objects.create(provider=self.provider)
151        res = self.client.put(
152            reverse(
153                "authentik_providers_ssf:stream",
154                kwargs={"application_slug": self.application.slug},
155            ),
156            data={
157                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
158                "aud": ["https://app.authentik.company"],
159                "delivery": {
160                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
161                    "endpoint_url": "https://app.authentik.company",
162                },
163                "events_requested": [
164                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
165                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
166                ],
167                "format": "iss_sub",
168                "stream_id": str(stream.pk),
169            },
170            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
171        )
172        self.assertEqual(res.status_code, 200)
173        self.assertIn(str(stream.pk), res.content.decode())
174        stream.refresh_from_db()
175        self.assertEqual(stream.aud, ["https://app.authentik.company"])
176
177    def test_stream_verify(self):
178        """Test stream verify"""
179        stream = Stream.objects.create(provider=self.provider)
180        res = self.client.post(
181            reverse(
182                "authentik_providers_ssf:stream-verify",
183                kwargs={"application_slug": self.application.slug},
184            ),
185            data={
186                "stream_id": str(stream.pk),
187            },
188            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
189        )
190        self.assertEqual(res.status_code, 204)
191
192    def test_stream_status(self):
193        """Test stream status"""
194        stream = Stream.objects.create(provider=self.provider)
195        res = self.client.get(
196            reverse(
197                "authentik_providers_ssf:stream-status",
198                kwargs={"application_slug": self.application.slug},
199            ),
200            data={
201                "stream_id": str(stream.pk),
202            },
203            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
204        )
205        self.assertEqual(res.status_code, 200)
206        self.assertJSONEqual(
207            res.content,
208            {
209                "stream_id": str(stream.pk),
210                "status": str(stream.status),
211            },
212        )
213
214    def test_stream_status_not_found(self):
215        """Test stream status"""
216        Stream.objects.create(provider=self.provider)
217        res = self.client.get(
218            reverse(
219                "authentik_providers_ssf:stream-status",
220                kwargs={"application_slug": self.application.slug},
221            ),
222            data={
223                "stream_id": str(uuid4()),
224            },
225            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
226        )
227        self.assertEqual(res.status_code, 404)
class TestStream(rest_framework.test.APITestCase):
 19class TestStream(APITestCase):
 20    def setUp(self):
 21        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
 22        self.provider = SSFProvider.objects.create(
 23            name=generate_id(),
 24            signing_key=create_test_cert(),
 25            backchannel_application=self.application,
 26        )
 27
 28    def test_stream_add_token(self):
 29        """test stream add (token auth)"""
 30        res = self.client.post(
 31            reverse(
 32                "authentik_providers_ssf:stream",
 33                kwargs={"application_slug": self.application.slug},
 34            ),
 35            data={
 36                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 37                "aud": ["https://app.authentik.company"],
 38                "delivery": {
 39                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
 40                    "endpoint_url": "https://app.authentik.company",
 41                },
 42                "events_requested": [
 43                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 44                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 45                ],
 46                "format": "iss_sub",
 47            },
 48            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 49        )
 50        self.assertEqual(res.status_code, 201)
 51        stream = Stream.objects.filter(provider=self.provider).first()
 52        self.assertIsNotNone(stream)
 53        event = StreamEvent.objects.filter(stream=stream).first()
 54        self.assertIsNotNone(event)
 55        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
 56        self.assertEqual(
 57            event.payload["events"],
 58            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
 59        )
 60
 61    def test_stream_add_poll(self):
 62        """test stream add - poll method"""
 63        res = self.client.post(
 64            reverse(
 65                "authentik_providers_ssf:stream",
 66                kwargs={"application_slug": self.application.slug},
 67            ),
 68            data={
 69                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
 70                "aud": ["https://app.authentik.company"],
 71                "delivery": {
 72                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/poll",
 73                },
 74                "events_requested": [
 75                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
 76                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
 77                ],
 78                "format": "iss_sub",
 79            },
 80            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 81        )
 82        self.assertEqual(res.status_code, 400)
 83        self.assertJSONEqual(
 84            res.content,
 85            {"delivery": {"method": ["Polling for SSF events is not currently supported."]}},
 86        )
 87
 88    def test_stream_delete(self):
 89        """delete stream"""
 90        stream = Stream.objects.create(provider=self.provider)
 91        res = self.client.delete(
 92            reverse(
 93                "authentik_providers_ssf:stream",
 94                kwargs={"application_slug": self.application.slug},
 95            ),
 96            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 97        )
 98        self.assertEqual(res.status_code, 204)
 99        stream.refresh_from_db()
100        self.assertEqual(stream.status, StreamStatus.DISABLED)
101
102    def test_stream_get(self):
103        """get stream"""
104        Stream.objects.create(provider=self.provider)
105        res = self.client.get(
106            reverse(
107                "authentik_providers_ssf:stream",
108                kwargs={"application_slug": self.application.slug},
109            ),
110            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
111        )
112        self.assertEqual(res.status_code, 200)
113
114    def test_stream_get_filter_query(self):
115        """get stream"""
116        other_stream = Stream.objects.create(provider=self.provider)
117        stream = Stream.objects.create(provider=self.provider)
118        res = self.client.get(
119            reverse(
120                "authentik_providers_ssf:stream",
121                kwargs={"application_slug": self.application.slug},
122            )
123            + f"?stream_id={stream.pk}",
124            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
125        )
126        self.assertEqual(res.status_code, 200)
127        self.assertIn(str(stream.pk), res.content.decode())
128        self.assertNotIn(str(other_stream.pk), res.content.decode())
129
130    def test_stream_patch(self):
131        """patch stream"""
132        other_stream = Stream.objects.create(provider=self.provider)
133        stream = Stream.objects.create(provider=self.provider)
134        res = self.client.patch(
135            reverse(
136                "authentik_providers_ssf:stream",
137                kwargs={"application_slug": self.application.slug},
138            ),
139            data={
140                "delivery": {"endpoint_url": "https://localhost"},
141                "stream_id": str(stream.pk),
142            },
143            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
144        )
145        self.assertEqual(res.status_code, 200)
146        self.assertIn(str(stream.pk), res.content.decode())
147        self.assertNotIn(str(other_stream.pk), res.content.decode())
148
149    def test_stream_put(self):
150        """put stream"""
151        stream = Stream.objects.create(provider=self.provider)
152        res = self.client.put(
153            reverse(
154                "authentik_providers_ssf:stream",
155                kwargs={"application_slug": self.application.slug},
156            ),
157            data={
158                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
159                "aud": ["https://app.authentik.company"],
160                "delivery": {
161                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
162                    "endpoint_url": "https://app.authentik.company",
163                },
164                "events_requested": [
165                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
166                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
167                ],
168                "format": "iss_sub",
169                "stream_id": str(stream.pk),
170            },
171            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
172        )
173        self.assertEqual(res.status_code, 200)
174        self.assertIn(str(stream.pk), res.content.decode())
175        stream.refresh_from_db()
176        self.assertEqual(stream.aud, ["https://app.authentik.company"])
177
178    def test_stream_verify(self):
179        """Test stream verify"""
180        stream = Stream.objects.create(provider=self.provider)
181        res = self.client.post(
182            reverse(
183                "authentik_providers_ssf:stream-verify",
184                kwargs={"application_slug": self.application.slug},
185            ),
186            data={
187                "stream_id": str(stream.pk),
188            },
189            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
190        )
191        self.assertEqual(res.status_code, 204)
192
193    def test_stream_status(self):
194        """Test stream status"""
195        stream = Stream.objects.create(provider=self.provider)
196        res = self.client.get(
197            reverse(
198                "authentik_providers_ssf:stream-status",
199                kwargs={"application_slug": self.application.slug},
200            ),
201            data={
202                "stream_id": str(stream.pk),
203            },
204            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
205        )
206        self.assertEqual(res.status_code, 200)
207        self.assertJSONEqual(
208            res.content,
209            {
210                "stream_id": str(stream.pk),
211                "status": str(stream.status),
212            },
213        )
214
215    def test_stream_status_not_found(self):
216        """Test stream status"""
217        Stream.objects.create(provider=self.provider)
218        res = self.client.get(
219            reverse(
220                "authentik_providers_ssf:stream-status",
221                kwargs={"application_slug": self.application.slug},
222            ),
223            data={
224                "stream_id": str(uuid4()),
225            },
226            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
227        )
228        self.assertEqual(res.status_code, 404)

Similar to TransactionTestCase, but use transaction.atomic() to achieve test isolation.

In most situations, TestCase should be preferred to TransactionTestCase as it allows faster execution. However, there are some situations where using TransactionTestCase might be necessary (e.g. testing some transactional behavior).

On database backends with no transaction support, TestCase behaves as TransactionTestCase.

def setUp(self):
20    def setUp(self):
21        self.application = Application.objects.create(name=generate_id(), slug=generate_id())
22        self.provider = SSFProvider.objects.create(
23            name=generate_id(),
24            signing_key=create_test_cert(),
25            backchannel_application=self.application,
26        )

Hook method for setting up the test fixture before exercising it.

def test_stream_add_token(self):
28    def test_stream_add_token(self):
29        """test stream add (token auth)"""
30        res = self.client.post(
31            reverse(
32                "authentik_providers_ssf:stream",
33                kwargs={"application_slug": self.application.slug},
34            ),
35            data={
36                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
37                "aud": ["https://app.authentik.company"],
38                "delivery": {
39                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
40                    "endpoint_url": "https://app.authentik.company",
41                },
42                "events_requested": [
43                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
44                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
45                ],
46                "format": "iss_sub",
47            },
48            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
49        )
50        self.assertEqual(res.status_code, 201)
51        stream = Stream.objects.filter(provider=self.provider).first()
52        self.assertIsNotNone(stream)
53        event = StreamEvent.objects.filter(stream=stream).first()
54        self.assertIsNotNone(event)
55        self.assertEqual(event.status, SSFEventStatus.PENDING_FAILED)
56        self.assertEqual(
57            event.payload["events"],
58            {"https://schemas.openid.net/secevent/ssf/event-type/verification": {"state": None}},
59        )

test stream add (token auth)

def test_stream_add_poll(self):
61    def test_stream_add_poll(self):
62        """test stream add - poll method"""
63        res = self.client.post(
64            reverse(
65                "authentik_providers_ssf:stream",
66                kwargs={"application_slug": self.application.slug},
67            ),
68            data={
69                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
70                "aud": ["https://app.authentik.company"],
71                "delivery": {
72                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/poll",
73                },
74                "events_requested": [
75                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
76                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
77                ],
78                "format": "iss_sub",
79            },
80            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
81        )
82        self.assertEqual(res.status_code, 400)
83        self.assertJSONEqual(
84            res.content,
85            {"delivery": {"method": ["Polling for SSF events is not currently supported."]}},
86        )

test stream add - poll method

def test_stream_delete(self):
 88    def test_stream_delete(self):
 89        """delete stream"""
 90        stream = Stream.objects.create(provider=self.provider)
 91        res = self.client.delete(
 92            reverse(
 93                "authentik_providers_ssf:stream",
 94                kwargs={"application_slug": self.application.slug},
 95            ),
 96            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
 97        )
 98        self.assertEqual(res.status_code, 204)
 99        stream.refresh_from_db()
100        self.assertEqual(stream.status, StreamStatus.DISABLED)

delete stream

def test_stream_get(self):
102    def test_stream_get(self):
103        """get stream"""
104        Stream.objects.create(provider=self.provider)
105        res = self.client.get(
106            reverse(
107                "authentik_providers_ssf:stream",
108                kwargs={"application_slug": self.application.slug},
109            ),
110            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
111        )
112        self.assertEqual(res.status_code, 200)

get stream

def test_stream_get_filter_query(self):
114    def test_stream_get_filter_query(self):
115        """get stream"""
116        other_stream = Stream.objects.create(provider=self.provider)
117        stream = Stream.objects.create(provider=self.provider)
118        res = self.client.get(
119            reverse(
120                "authentik_providers_ssf:stream",
121                kwargs={"application_slug": self.application.slug},
122            )
123            + f"?stream_id={stream.pk}",
124            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
125        )
126        self.assertEqual(res.status_code, 200)
127        self.assertIn(str(stream.pk), res.content.decode())
128        self.assertNotIn(str(other_stream.pk), res.content.decode())

get stream

def test_stream_patch(self):
130    def test_stream_patch(self):
131        """patch stream"""
132        other_stream = Stream.objects.create(provider=self.provider)
133        stream = Stream.objects.create(provider=self.provider)
134        res = self.client.patch(
135            reverse(
136                "authentik_providers_ssf:stream",
137                kwargs={"application_slug": self.application.slug},
138            ),
139            data={
140                "delivery": {"endpoint_url": "https://localhost"},
141                "stream_id": str(stream.pk),
142            },
143            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
144        )
145        self.assertEqual(res.status_code, 200)
146        self.assertIn(str(stream.pk), res.content.decode())
147        self.assertNotIn(str(other_stream.pk), res.content.decode())

patch stream

def test_stream_put(self):
149    def test_stream_put(self):
150        """put stream"""
151        stream = Stream.objects.create(provider=self.provider)
152        res = self.client.put(
153            reverse(
154                "authentik_providers_ssf:stream",
155                kwargs={"application_slug": self.application.slug},
156            ),
157            data={
158                "iss": "https://authentik.company/.well-known/ssf-configuration/foo/5",
159                "aud": ["https://app.authentik.company"],
160                "delivery": {
161                    "method": "https://schemas.openid.net/secevent/risc/delivery-method/push",
162                    "endpoint_url": "https://app.authentik.company",
163                },
164                "events_requested": [
165                    "https://schemas.openid.net/secevent/caep/event-type/credential-change",
166                    "https://schemas.openid.net/secevent/caep/event-type/session-revoked",
167                ],
168                "format": "iss_sub",
169                "stream_id": str(stream.pk),
170            },
171            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
172        )
173        self.assertEqual(res.status_code, 200)
174        self.assertIn(str(stream.pk), res.content.decode())
175        stream.refresh_from_db()
176        self.assertEqual(stream.aud, ["https://app.authentik.company"])

put stream

def test_stream_verify(self):
178    def test_stream_verify(self):
179        """Test stream verify"""
180        stream = Stream.objects.create(provider=self.provider)
181        res = self.client.post(
182            reverse(
183                "authentik_providers_ssf:stream-verify",
184                kwargs={"application_slug": self.application.slug},
185            ),
186            data={
187                "stream_id": str(stream.pk),
188            },
189            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
190        )
191        self.assertEqual(res.status_code, 204)

Test stream verify

def test_stream_status(self):
193    def test_stream_status(self):
194        """Test stream status"""
195        stream = Stream.objects.create(provider=self.provider)
196        res = self.client.get(
197            reverse(
198                "authentik_providers_ssf:stream-status",
199                kwargs={"application_slug": self.application.slug},
200            ),
201            data={
202                "stream_id": str(stream.pk),
203            },
204            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
205        )
206        self.assertEqual(res.status_code, 200)
207        self.assertJSONEqual(
208            res.content,
209            {
210                "stream_id": str(stream.pk),
211                "status": str(stream.status),
212            },
213        )

Test stream status

def test_stream_status_not_found(self):
215    def test_stream_status_not_found(self):
216        """Test stream status"""
217        Stream.objects.create(provider=self.provider)
218        res = self.client.get(
219            reverse(
220                "authentik_providers_ssf:stream-status",
221                kwargs={"application_slug": self.application.slug},
222            ),
223            data={
224                "stream_id": str(uuid4()),
225            },
226            HTTP_AUTHORIZATION=f"Bearer {self.provider.token.key}",
227        )
228        self.assertEqual(res.status_code, 404)

Test stream status