authentik.core.tests.test_users_api

Test Users API

   1"""Test Users API"""
   2
   3from datetime import datetime, timedelta
   4from json import loads
   5
   6from django.contrib.auth.hashers import make_password
   7from django.urls.base import reverse
   8from django.utils.timezone import now
   9from rest_framework.test import APITestCase
  10
  11from authentik.brands.models import Brand
  12from authentik.core.models import (
  13    USER_ATTRIBUTE_TOKEN_EXPIRING,
  14    AuthenticatedSession,
  15    Group,
  16    Session,
  17    Token,
  18    User,
  19    UserTypes,
  20)
  21from authentik.core.tests.utils import (
  22    create_test_admin_user,
  23    create_test_brand,
  24    create_test_flow,
  25    create_test_user,
  26)
  27from authentik.flows.models import FlowAuthenticationRequirement, FlowDesignation
  28from authentik.lib.generators import generate_id, generate_key
  29from authentik.rbac.models import Role
  30from authentik.stages.email.models import EmailStage
  31
  32INVALID_PASSWORD_HASH = "not-a-valid-hash"
  33INVALID_PASSWORD_HASH_ERROR = "Invalid password hash format. Must be a valid Django password hash."
  34
  35
  36class TestUsersAPI(APITestCase):
  37    """Test Users API"""
  38
  39    def setUp(self) -> None:
  40        self.admin = create_test_admin_user()
  41        self.user = create_test_user()
  42
  43    def _set_password_hash(self, user: User, password_hash: str, client=None):
  44        return (client or self.client).post(
  45            reverse("authentik_api:user-set-password-hash", kwargs={"pk": user.pk}),
  46            data={"password": password_hash},
  47        )
  48
  49    def _assert_password_hash_set(
  50        self, user: User, password: str, password_hash: str, response
  51    ) -> None:
  52        self.assertEqual(response.status_code, 204, response.data)
  53        user.refresh_from_db()
  54        self.assertEqual(user.password, password_hash)
  55        self.assertTrue(user.check_password(password))
  56
  57    def test_filter_type(self):
  58        """Test API filtering by type"""
  59        self.client.force_login(self.admin)
  60        user = create_test_admin_user(type=UserTypes.EXTERNAL)
  61        response = self.client.get(
  62            reverse("authentik_api:user-list"),
  63            data={
  64                "type": UserTypes.EXTERNAL,
  65                "username": user.username,
  66            },
  67        )
  68        self.assertEqual(response.status_code, 200)
  69
  70    def test_filter_is_superuser(self):
  71        """Test API filtering by superuser status"""
  72        User.objects.all().delete()
  73        admin = create_test_admin_user()
  74        self.client.force_login(admin)
  75        # Test superuser
  76        response = self.client.get(
  77            reverse("authentik_api:user-list"),
  78            data={
  79                "is_superuser": True,
  80            },
  81        )
  82        self.assertEqual(response.status_code, 200)
  83        body = loads(response.content)
  84        self.assertEqual(len(body["results"]), 1)
  85        self.assertEqual(body["results"][0]["username"], admin.username)
  86        # Test non-superuser
  87        user = create_test_user()
  88        response = self.client.get(
  89            reverse("authentik_api:user-list"),
  90            data={
  91                "is_superuser": False,
  92            },
  93        )
  94        self.assertEqual(response.status_code, 200)
  95        body = loads(response.content)
  96        self.assertEqual(len(body["results"]), 1, body)
  97        self.assertEqual(body["results"][0]["username"], user.username)
  98
  99    def test_list_with_groups(self):
 100        """Test listing with groups"""
 101        self.client.force_login(self.admin)
 102        response = self.client.get(reverse("authentik_api:user-list"), {"include_groups": "true"})
 103        self.assertEqual(response.status_code, 200)
 104
 105    def test_recovery_no_flow(self):
 106        """Test user recovery link (no recovery flow set)"""
 107        self.client.force_login(self.admin)
 108        response = self.client.post(
 109            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk})
 110        )
 111        self.assertEqual(response.status_code, 400)
 112        self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})
 113
 114    def test_set_password(self):
 115        """Test Direct password set"""
 116        self.client.force_login(self.admin)
 117        new_pw = generate_key()
 118        response = self.client.post(
 119            reverse("authentik_api:user-set-password", kwargs={"pk": self.admin.pk}),
 120            data={"password": new_pw},
 121        )
 122        self.assertEqual(response.status_code, 204)
 123        self.admin.refresh_from_db()
 124        self.assertTrue(self.admin.check_password(new_pw))
 125
 126    def test_set_password_blank(self):
 127        """Test Direct password set"""
 128        self.client.force_login(self.admin)
 129        response = self.client.post(
 130            reverse("authentik_api:user-set-password", kwargs={"pk": self.admin.pk}),
 131            data={"password": ""},
 132        )
 133        self.assertEqual(response.status_code, 400)
 134        self.assertJSONEqual(response.content, {"password": ["This field may not be blank."]})
 135
 136    def test_set_password_hash(self):
 137        """Test setting a user's password from a hash."""
 138        self.client.force_login(self.admin)
 139        password = generate_key()
 140        password_hash = make_password(password)
 141        response = self._set_password_hash(self.user, password_hash)
 142
 143        self._assert_password_hash_set(self.user, password, password_hash, response)
 144
 145    def test_set_password_hash_invalid(self):
 146        """Test invalid password hashes are rejected."""
 147        self.client.force_login(self.admin)
 148        response = self._set_password_hash(self.user, INVALID_PASSWORD_HASH)
 149
 150        self.assertEqual(response.status_code, 400)
 151        self.assertJSONEqual(
 152            response.content,
 153            {"password": [INVALID_PASSWORD_HASH_ERROR]},
 154        )
 155
 156    def test_recovery(self):
 157        """Test user recovery link"""
 158        flow = create_test_flow(
 159            FlowDesignation.RECOVERY,
 160            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
 161        )
 162        brand: Brand = create_test_brand()
 163        brand.flow_recovery = flow
 164        brand.save()
 165        self.client.force_login(self.admin)
 166        response = self.client.post(
 167            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk})
 168        )
 169        self.assertEqual(response.status_code, 200)
 170
 171    def test_recovery_duration(self):
 172        """Test user recovery token duration"""
 173        Token.objects.all().delete()
 174        flow = create_test_flow(
 175            FlowDesignation.RECOVERY,
 176            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
 177        )
 178        brand: Brand = create_test_brand()
 179        brand.flow_recovery = flow
 180        brand.save()
 181        self.client.force_login(self.admin)
 182        response = self.client.post(
 183            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
 184            data={"token_duration": "days=33"},
 185        )
 186        self.assertEqual(response.status_code, 200)
 187        expires = Token.objects.first().expires
 188        expected_expires = now() + timedelta(days=33)
 189        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
 190
 191    def test_recovery_duration_update(self):
 192        """Test user recovery token duration update"""
 193        Token.objects.all().delete()
 194        flow = create_test_flow(
 195            FlowDesignation.RECOVERY,
 196            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
 197        )
 198        brand: Brand = create_test_brand()
 199        brand.flow_recovery = flow
 200        brand.save()
 201        self.client.force_login(self.admin)
 202        response = self.client.post(
 203            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
 204            data={"token_duration": "days=33"},
 205        )
 206        self.assertEqual(response.status_code, 200)
 207        expires = Token.objects.first().expires
 208        expected_expires = now() + timedelta(days=33)
 209        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
 210        response = self.client.post(
 211            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
 212            data={"token_duration": "days=66"},
 213        )
 214        expires = Token.objects.first().expires
 215        expected_expires = now() + timedelta(days=66)
 216        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
 217
 218    def test_recovery_email_no_flow(self):
 219        """Test user recovery link (no recovery flow set)"""
 220        self.client.force_login(self.admin)
 221        self.user.email = ""
 222        self.user.save()
 223        stage = EmailStage.objects.create(name="email")
 224        response = self.client.post(
 225            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
 226            data={"email_stage": stage.pk},
 227        )
 228        self.assertEqual(response.status_code, 400)
 229        self.assertJSONEqual(
 230            response.content, {"non_field_errors": "User does not have an email address set."}
 231        )
 232        self.user.email = "foo@bar.baz"
 233        self.user.save()
 234        response = self.client.post(
 235            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
 236            data={"email_stage": stage.pk},
 237        )
 238        self.assertEqual(response.status_code, 400)
 239        self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})
 240
 241    def test_recovery_email_no_stage(self):
 242        """Test user recovery link (no email stage)"""
 243        self.user.email = "foo@bar.baz"
 244        self.user.save()
 245        flow = create_test_flow(designation=FlowDesignation.RECOVERY)
 246        brand: Brand = create_test_brand()
 247        brand.flow_recovery = flow
 248        brand.save()
 249        self.client.force_login(self.admin)
 250        response = self.client.post(
 251            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
 252        )
 253        self.assertEqual(response.status_code, 400)
 254        self.assertJSONEqual(response.content, {"email_stage": ["This field is required."]})
 255
 256    def test_recovery_email(self):
 257        """Test user recovery link"""
 258        self.user.email = "foo@bar.baz"
 259        self.user.save()
 260        flow = create_test_flow(FlowDesignation.RECOVERY)
 261        brand: Brand = create_test_brand()
 262        brand.flow_recovery = flow
 263        brand.save()
 264
 265        stage = EmailStage.objects.create(name="email")
 266
 267        self.client.force_login(self.admin)
 268        response = self.client.post(
 269            reverse(
 270                "authentik_api:user-recovery-email",
 271                kwargs={"pk": self.user.pk},
 272            ),
 273            data={"email_stage": stage.pk},
 274        )
 275        self.assertEqual(response.status_code, 204)
 276
 277    def test_service_account(self):
 278        """Service account creation"""
 279        self.client.force_login(self.admin)
 280        response = self.client.post(reverse("authentik_api:user-service-account"))
 281        self.assertEqual(response.status_code, 400)
 282        response = self.client.post(
 283            reverse("authentik_api:user-service-account"),
 284            data={
 285                "name": "test-sa",
 286                "create_group": True,
 287            },
 288        )
 289        self.assertEqual(response.status_code, 200)
 290
 291        user_filter = User.objects.filter(
 292            username="test-sa",
 293            type=UserTypes.SERVICE_ACCOUNT,
 294            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
 295        )
 296        self.assertTrue(user_filter.exists())
 297        user: User = user_filter.first()
 298        self.assertFalse(user.has_usable_password())
 299
 300        token_filter = Token.objects.filter(user=user)
 301        self.assertTrue(token_filter.exists())
 302        self.assertTrue(token_filter.first().expiring)
 303
 304    def test_service_account_set_password_hash(self):
 305        """Service account password hash can be set through the API."""
 306        self.client.force_login(self.admin)
 307        response = self.client.post(
 308            reverse("authentik_api:user-service-account"),
 309            data={
 310                "name": "test-sa",
 311                "create_group": False,
 312            },
 313        )
 314        self.assertEqual(response.status_code, 200, response.data)
 315        body = loads(response.content)
 316
 317        user = User.objects.get(pk=body["user_pk"])
 318        self.assertEqual(user.type, UserTypes.SERVICE_ACCOUNT)
 319        self.assertFalse(user.has_usable_password())
 320
 321        password = generate_key()
 322        password_hash = make_password(password)
 323        response = self._set_password_hash(user, password_hash)
 324
 325        self._assert_password_hash_set(user, password, password_hash, response)
 326
 327    def test_service_account_no_expire(self):
 328        """Service account creation without token expiration"""
 329        self.client.force_login(self.admin)
 330        response = self.client.post(
 331            reverse("authentik_api:user-service-account"),
 332            data={
 333                "name": "test-sa",
 334                "create_group": True,
 335                "expiring": False,
 336            },
 337        )
 338        self.assertEqual(response.status_code, 200)
 339
 340        user_filter = User.objects.filter(
 341            username="test-sa",
 342            type=UserTypes.SERVICE_ACCOUNT,
 343            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: False},
 344        )
 345        self.assertTrue(user_filter.exists())
 346        user: User = user_filter.first()
 347        self.assertFalse(user.has_usable_password())
 348
 349        token_filter = Token.objects.filter(user=user)
 350        self.assertTrue(token_filter.exists())
 351        self.assertFalse(token_filter.first().expiring)
 352
 353    def test_service_account_with_custom_expire(self):
 354        """Service account creation with custom token expiration date"""
 355        self.client.force_login(self.admin)
 356        expire_on = datetime(2050, 11, 11, 11, 11, 11).astimezone()
 357        response = self.client.post(
 358            reverse("authentik_api:user-service-account"),
 359            data={
 360                "name": "test-sa",
 361                "create_group": True,
 362                "expires": expire_on.isoformat(),
 363            },
 364        )
 365        self.assertEqual(response.status_code, 200)
 366
 367        user_filter = User.objects.filter(
 368            username="test-sa",
 369            type=UserTypes.SERVICE_ACCOUNT,
 370            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
 371        )
 372        self.assertTrue(user_filter.exists())
 373        user: User = user_filter.first()
 374        self.assertFalse(user.has_usable_password())
 375
 376        token_filter = Token.objects.filter(user=user)
 377        self.assertTrue(token_filter.exists())
 378        token = token_filter.first()
 379        self.assertTrue(token.expiring)
 380        self.assertEqual(token.expires, expire_on)
 381
 382    def test_service_account_invalid(self):
 383        """Service account creation (twice with same name, expect error)"""
 384        self.client.force_login(self.admin)
 385        response = self.client.post(
 386            reverse("authentik_api:user-service-account"),
 387            data={
 388                "name": "test-sa",
 389                "create_group": True,
 390            },
 391        )
 392        self.assertEqual(response.status_code, 200)
 393
 394        user_filter = User.objects.filter(
 395            username="test-sa",
 396            type=UserTypes.SERVICE_ACCOUNT,
 397            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
 398        )
 399        self.assertTrue(user_filter.exists())
 400        user: User = user_filter.first()
 401        self.assertFalse(user.has_usable_password())
 402
 403        token_filter = Token.objects.filter(user=user)
 404        self.assertTrue(token_filter.exists())
 405        self.assertTrue(token_filter.first().expiring)
 406
 407        response = self.client.post(
 408            reverse("authentik_api:user-service-account"),
 409            data={
 410                "name": "test-sa",
 411                "create_group": True,
 412            },
 413        )
 414        self.assertEqual(response.status_code, 400)
 415
 416    def test_paths(self):
 417        """Test path"""
 418        self.client.force_login(self.admin)
 419        response = self.client.get(
 420            reverse("authentik_api:user-paths"),
 421        )
 422        self.assertEqual(response.status_code, 200)
 423        expected = list(
 424            User.objects.all()
 425            .values("path")
 426            .distinct()
 427            .order_by("path")
 428            .values_list("path", flat=True)
 429        )
 430        self.assertJSONEqual(response.content.decode(), {"paths": expected})
 431
 432    def test_path_valid(self):
 433        """Test path"""
 434        self.client.force_login(self.admin)
 435        response = self.client.post(
 436            reverse("authentik_api:user-list"),
 437            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "foo"},
 438        )
 439        self.assertEqual(response.status_code, 201)
 440
 441    def test_path_invalid(self):
 442        """Test path (invalid)"""
 443        self.client.force_login(self.admin)
 444        response = self.client.post(
 445            reverse("authentik_api:user-list"),
 446            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "/foo"},
 447        )
 448        self.assertEqual(response.status_code, 400)
 449        self.assertJSONEqual(
 450            response.content.decode(), {"path": ["No leading or trailing slashes allowed."]}
 451        )
 452
 453        self.client.force_login(self.admin)
 454        response = self.client.post(
 455            reverse("authentik_api:user-list"),
 456            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": ""},
 457        )
 458        self.assertEqual(response.status_code, 400)
 459        self.assertJSONEqual(response.content.decode(), {"path": ["This field may not be blank."]})
 460
 461        response = self.client.post(
 462            reverse("authentik_api:user-list"),
 463            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "foo/"},
 464        )
 465        self.assertEqual(response.status_code, 400)
 466        self.assertJSONEqual(
 467            response.content.decode(), {"path": ["No leading or trailing slashes allowed."]}
 468        )
 469
 470        response = self.client.post(
 471            reverse("authentik_api:user-list"),
 472            data={
 473                "name": generate_id(),
 474                "username": generate_id(),
 475                "groups": [],
 476                "path": "fos//o",
 477            },
 478        )
 479        self.assertEqual(response.status_code, 400)
 480        self.assertJSONEqual(
 481            response.content.decode(), {"path": ["No empty segments in user path allowed."]}
 482        )
 483
 484    def test_me(self):
 485        """Test user's me endpoint"""
 486        self.client.force_login(self.admin)
 487        response = self.client.get(reverse("authentik_api:user-me"))
 488        self.assertEqual(response.status_code, 200)
 489
 490    def test_session_delete(self):
 491        """Ensure sessions are deleted when a user is deactivated"""
 492        user = create_test_admin_user()
 493        session_id = generate_id()
 494        session = Session.objects.create(
 495            session_key=session_id,
 496            last_ip="255.255.255.255",
 497            last_user_agent="",
 498        )
 499        AuthenticatedSession.objects.create(
 500            session=session,
 501            user=user,
 502        )
 503
 504        self.client.force_login(self.admin)
 505        response = self.client.patch(
 506            reverse("authentik_api:user-detail", kwargs={"pk": user.pk}),
 507            data={
 508                "is_active": False,
 509            },
 510        )
 511        self.assertEqual(response.status_code, 200)
 512
 513        self.assertFalse(Session.objects.filter(session_key=session_id).exists())
 514        self.assertFalse(
 515            AuthenticatedSession.objects.filter(session__session_key=session_id).exists()
 516        )
 517
 518    def test_sort_by_last_updated(self):
 519        """Test API sorting by last_updated"""
 520        User.objects.all().delete()
 521        admin = create_test_admin_user()
 522        self.client.force_login(admin)
 523
 524        user = create_test_user()
 525        admin.first_name = "Sample change"
 526        admin.last_name = "To trigger an update"
 527        admin.save()
 528
 529        # Ascending
 530        response = self.client.get(
 531            reverse("authentik_api:user-list"),
 532            data={
 533                "ordering": "last_updated",
 534            },
 535        )
 536        self.assertEqual(response.status_code, 200)
 537
 538        body = loads(response.content)
 539        self.assertEqual(len(body["results"]), 2)
 540        self.assertEqual(body["results"][0]["pk"], user.pk)
 541
 542        # Descending
 543        response = self.client.get(
 544            reverse("authentik_api:user-list"),
 545            data={
 546                "ordering": "-last_updated",
 547            },
 548        )
 549        self.assertEqual(response.status_code, 200)
 550
 551        body = loads(response.content)
 552        self.assertEqual(len(body["results"]), 2)
 553        self.assertEqual(body["results"][0]["pk"], admin.pk)
 554
 555    def test_sort_by_date_joined(self):
 556        """Test API sorting by date_joined"""
 557        User.objects.all().delete()
 558        admin = create_test_admin_user()
 559        self.client.force_login(admin)
 560
 561        user = create_test_user()
 562
 563        response = self.client.get(
 564            reverse("authentik_api:user-list"),
 565            data={
 566                "ordering": "date_joined",
 567            },
 568        )
 569        self.assertEqual(response.status_code, 200)
 570
 571        body = loads(response.content)
 572        self.assertEqual(len(body["results"]), 2)
 573        self.assertEqual(body["results"][0]["pk"], admin.pk)
 574
 575        response = self.client.get(
 576            reverse("authentik_api:user-list"),
 577            data={
 578                "ordering": "-date_joined",
 579            },
 580        )
 581        self.assertEqual(response.status_code, 200)
 582
 583        body = loads(response.content)
 584        self.assertEqual(len(body["results"]), 2)
 585        self.assertEqual(body["results"][0]["pk"], user.pk)
 586
 587    def test_service_account_validation_empty_username(self):
 588        """Test service account creation with empty/blank username validation"""
 589        self.client.force_login(self.admin)
 590
 591        # Test with empty string
 592        response = self.client.post(
 593            reverse("authentik_api:user-service-account"),
 594            data={
 595                "name": "",
 596                "create_group": True,
 597            },
 598        )
 599        self.assertEqual(response.status_code, 400)
 600        self.assertJSONEqual(
 601            response.content,
 602            {"name": ["This field may not be blank."]},
 603        )
 604
 605        # Test with only whitespace
 606        response = self.client.post(
 607            reverse("authentik_api:user-service-account"),
 608            data={
 609                "name": "   ",
 610                "create_group": True,
 611            },
 612        )
 613        self.assertEqual(response.status_code, 400)
 614        self.assertJSONEqual(
 615            response.content,
 616            {"name": ["This field may not be blank."]},
 617        )
 618
 619        # Test with tab and newline characters
 620        response = self.client.post(
 621            reverse("authentik_api:user-service-account"),
 622            data={
 623                "name": "\t\n",
 624                "create_group": True,
 625            },
 626        )
 627        self.assertEqual(response.status_code, 400)
 628        self.assertJSONEqual(
 629            response.content,
 630            {"name": ["This field may not be blank."]},
 631        )
 632
 633    def test_service_account_validation_valid_username(self):
 634        """Test service account creation with valid username"""
 635        self.client.force_login(self.admin)
 636
 637        # Test with valid username
 638        response = self.client.post(
 639            reverse("authentik_api:user-service-account"),
 640            data={
 641                "name": "valid-service-account",
 642                "create_group": True,
 643            },
 644        )
 645        self.assertEqual(response.status_code, 200)
 646
 647        # Verify response structure
 648        body = loads(response.content)
 649        self.assertIn("username", body)
 650        self.assertIn("user_uid", body)
 651        self.assertIn("user_pk", body)
 652        self.assertIn("group_pk", body)  # Should exist since create_group=True
 653        self.assertIn("token", body)
 654
 655        # Verify field types
 656        self.assertEqual(body["username"], "valid-service-account")
 657        self.assertIsInstance(body["user_pk"], int)
 658        self.assertIsInstance(body["user_uid"], str)
 659        self.assertIsInstance(body["token"], str)
 660        self.assertIsInstance(body["group_pk"], str)
 661
 662    def test_service_account_validation_without_group(self):
 663        """Test service account creation without creating a group"""
 664        self.client.force_login(self.admin)
 665
 666        response = self.client.post(
 667            reverse("authentik_api:user-service-account"),
 668            data={
 669                "name": "no-group-service-account",
 670                "create_group": False,
 671            },
 672        )
 673        self.assertEqual(response.status_code, 200)
 674
 675        body = loads(response.content)
 676        self.assertIn("username", body)
 677        self.assertIn("user_uid", body)
 678        self.assertIn("user_pk", body)
 679        self.assertIn("token", body)
 680        # Should NOT have group_pk when create_group=False
 681        self.assertNotIn("group_pk", body)
 682
 683    def test_service_account_validation_duplicate_username(self):
 684        """Test service account creation with duplicate username"""
 685        self.client.force_login(self.admin)
 686
 687        # Create first service account
 688        response = self.client.post(
 689            reverse("authentik_api:user-service-account"),
 690            data={
 691                "name": "duplicate-test",
 692                "create_group": True,
 693            },
 694        )
 695        self.assertEqual(response.status_code, 200)
 696
 697        # Attempt to create second with same username
 698        response = self.client.post(
 699            reverse("authentik_api:user-service-account"),
 700            data={
 701                "name": "duplicate-test",
 702                "create_group": True,
 703            },
 704        )
 705        self.assertEqual(response.status_code, 400)
 706        self.assertJSONEqual(
 707            response.content,
 708            {"name": ["This field must be unique."]},
 709        )
 710
 711    def test_service_account_validation_invalid_create_group(self):
 712        """Test service account creation with invalid create_group field"""
 713        self.client.force_login(self.admin)
 714
 715        # Test with string instead of boolean
 716        response = self.client.post(
 717            reverse("authentik_api:user-service-account"),
 718            data={
 719                "name": "test-sa",
 720                "create_group": "invalid",
 721            },
 722        )
 723        self.assertEqual(response.status_code, 400)
 724        self.assertJSONEqual(
 725            response.content,
 726            {"create_group": ["Must be a valid boolean."]},
 727        )
 728
 729        # Test with number instead of boolean
 730        response = self.client.post(
 731            reverse("authentik_api:user-service-account"),
 732            data={
 733                "name": "test-sa",
 734                "create_group": 123,
 735            },
 736        )
 737        self.assertEqual(response.status_code, 400)
 738        self.assertJSONEqual(
 739            response.content,
 740            {"create_group": ["Must be a valid boolean."]},
 741        )
 742
 743    def test_service_account_validation_invalid_expiring(self):
 744        """Test service account creation with invalid expiring field"""
 745        self.client.force_login(self.admin)
 746
 747        # Test with string instead of boolean
 748        response = self.client.post(
 749            reverse("authentik_api:user-service-account"),
 750            data={
 751                "name": "test-sa",
 752                "expiring": "invalid",
 753            },
 754        )
 755        self.assertEqual(response.status_code, 400)
 756        self.assertJSONEqual(
 757            response.content,
 758            {"expiring": ["Must be a valid boolean."]},
 759        )
 760
 761    def test_service_account_validation_invalid_expires(self):
 762        """Test service account creation with invalid expires field"""
 763        self.client.force_login(self.admin)
 764
 765        # Test with invalid datetime string
 766        response = self.client.post(
 767            reverse("authentik_api:user-service-account"),
 768            data={
 769                "name": "test-sa",
 770                "expires": "invalid-datetime",
 771            },
 772        )
 773        self.assertEqual(response.status_code, 400)
 774        self.assertJSONEqual(
 775            response.content,
 776            {
 777                "expires": [
 778                    "Datetime has wrong format. Use one of these formats instead: "
 779                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
 780                ]
 781            },
 782        )
 783
 784        # Test with invalid format
 785        response = self.client.post(
 786            reverse("authentik_api:user-service-account"),
 787            data={
 788                "name": "test-sa",
 789                "expires": "2024-13-45",  # Invalid month/day
 790            },
 791        )
 792        self.assertEqual(response.status_code, 400)
 793        self.assertJSONEqual(
 794            response.content,
 795            {
 796                "expires": [
 797                    "Datetime has wrong format. Use one of these formats instead: "
 798                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
 799                ]
 800            },
 801        )
 802
 803    def test_service_account_validation_multiple_errors(self):
 804        """Test service account creation with multiple validation errors"""
 805        self.client.force_login(self.admin)
 806
 807        response = self.client.post(
 808            reverse("authentik_api:user-service-account"),
 809            data={
 810                "name": "",  # Empty username
 811                "create_group": "invalid",  # Invalid boolean
 812                "expiring": 123,  # Invalid boolean
 813                "expires": "not-a-date",  # Invalid datetime
 814            },
 815        )
 816        self.assertEqual(response.status_code, 400)
 817        self.assertJSONEqual(
 818            response.content,
 819            {
 820                "name": ["This field may not be blank."],
 821                "create_group": ["Must be a valid boolean."],
 822                "expiring": ["Must be a valid boolean."],
 823                "expires": [
 824                    "Datetime has wrong format. Use one of these formats instead: "
 825                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
 826                ],
 827            },
 828        )
 829
 830    def test_service_account_validation_user_friendly_duplicate_error(self):
 831        """Test that duplicate username returns user-friendly error, not database error"""
 832        self.client.force_login(self.admin)
 833
 834        # Create first service account
 835        response = self.client.post(
 836            reverse("authentik_api:user-service-account"),
 837            data={
 838                "name": "duplicate-username-test",
 839                "create_group": True,
 840            },
 841        )
 842        self.assertEqual(response.status_code, 200)
 843
 844        # Attempt to create second with same username
 845        response = self.client.post(
 846            reverse("authentik_api:user-service-account"),
 847            data={
 848                "name": "duplicate-username-test",
 849                "create_group": True,
 850            },
 851        )
 852        self.assertEqual(response.status_code, 400)
 853        self.assertJSONEqual(
 854            response.content,
 855            {"name": ["This field must be unique."]},
 856        )
 857
 858    def test_filter_last_login(self):
 859        """Test API filtering by last_login"""
 860        from datetime import timedelta
 861
 862        from django.utils import timezone
 863
 864        User.objects.all().delete()
 865        admin = create_test_admin_user()
 866        self.client.force_login(admin)
 867
 868        # Create users with different last_login values
 869        user_recent = create_test_user()
 870        user_recent.last_login = timezone.now()
 871        user_recent.save()
 872
 873        user_old = create_test_user()
 874        user_old.last_login = timezone.now() - timedelta(days=400)  # Over 1 year ago
 875        user_old.save()
 876
 877        user_never = create_test_user()
 878        user_never.last_login = None  # Never logged in
 879        user_never.save()
 880
 881        # Filter users who logged in before 1 year ago
 882        one_year_ago = (timezone.now() - timedelta(days=365)).isoformat()
 883        response = self.client.get(
 884            reverse("authentik_api:user-list"),
 885            data={"last_login__lt": one_year_ago},
 886        )
 887        self.assertEqual(response.status_code, 200)
 888        body = loads(response.content)
 889        self.assertEqual(len(body["results"]), 1)
 890        self.assertEqual(body["results"][0]["pk"], user_old.pk)
 891
 892        # Filter users who have never logged in
 893        response = self.client.get(
 894            reverse("authentik_api:user-list"),
 895            data={"last_login__isnull": True},
 896        )
 897        self.assertEqual(response.status_code, 200)
 898        body = loads(response.content)
 899        # Should include user_never and admin (who hasn't logged in via the app)
 900        pks = [r["pk"] for r in body["results"]]
 901        self.assertIn(user_never.pk, pks)
 902
 903    def test_sort_by_last_login(self):
 904        """Test API sorting by last_login"""
 905        from datetime import timedelta
 906
 907        from django.utils import timezone
 908
 909        User.objects.all().delete()
 910        admin = create_test_admin_user()
 911        self.client.force_login(admin)
 912
 913        user1 = create_test_user()
 914        user1.last_login = timezone.now() - timedelta(days=10)
 915        user1.save()
 916
 917        user2 = create_test_user()
 918        user2.last_login = timezone.now() - timedelta(days=5)
 919        user2.save()
 920
 921        # Ascending order (oldest first)
 922        response = self.client.get(
 923            reverse("authentik_api:user-list"),
 924            data={"ordering": "last_login"},
 925        )
 926        self.assertEqual(response.status_code, 200)
 927        body = loads(response.content)
 928        # Users with null last_login come first, then user1 (older), then user2 (newer)
 929        self.assertEqual(len(body["results"]), 3)
 930
 931        # Descending order (newest first)
 932        response = self.client.get(
 933            reverse("authentik_api:user-list"),
 934            data={"ordering": "-last_login"},
 935        )
 936        self.assertEqual(response.status_code, 200)
 937        body = loads(response.content)
 938        # user2 should come before user1 (more recent login)
 939        pks = [r["pk"] for r in body["results"]]
 940        self.assertIn(user1.pk, pks)
 941        self.assertIn(user2.pk, pks)
 942        # Verify user2 comes before user1 in descending order
 943        self.assertLess(pks.index(user2.pk), pks.index(user1.pk))
 944
 945
 946class TestUsersAPIGroupRoleValidation(APITestCase):
 947    """Test that PATCH /api/v3/core/users/{pk}/ enforces group and role permission checks."""
 948
 949    def setUp(self) -> None:
 950        self.actor = create_test_user()
 951        self.target = create_test_user()
 952
 953    def _patch(self, data: dict):
 954        self.client.force_login(self.actor)
 955        return self.client.patch(
 956            reverse("authentik_api:user-detail", kwargs={"pk": self.target.pk}),
 957            data=data,
 958            content_type="application/json",
 959        )
 960
 961    def test_patch_superuser_group_no_perm(self):
 962        """Assigning a superuser group without enable_group_superuser must be rejected."""
 963        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 964        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 965        group = Group.objects.create(name=generate_id(), is_superuser=True)
 966        res = self._patch({"groups": [str(group.pk)]})
 967        self.assertEqual(res.status_code, 400)
 968
 969    def test_patch_superuser_group_with_perm(self):
 970        """Assigning a superuser group with enable_group_superuser must succeed."""
 971        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 972        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 973        self.actor.assign_perms_to_managed_role("authentik_core.enable_group_superuser")
 974        group = Group.objects.create(name=generate_id(), is_superuser=True)
 975        res = self._patch({"groups": [str(group.pk)]})
 976        self.assertEqual(res.status_code, 200)
 977
 978    def test_patch_non_superuser_group_no_perm(self):
 979        """Assigning a non-superuser group without special permission must succeed."""
 980        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 981        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 982        group = Group.objects.create(name=generate_id(), is_superuser=False)
 983        res = self._patch({"groups": [str(group.pk)]})
 984        self.assertEqual(res.status_code, 200)
 985
 986    def test_patch_existing_superuser_group_no_perm(self):
 987        """Keeping an existing superuser group membership without the permission must succeed."""
 988        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 989        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 990        group = Group.objects.create(name=generate_id(), is_superuser=True)
 991        self.target.groups.add(group)
 992        res = self._patch({"groups": [str(group.pk)]})
 993        self.assertEqual(res.status_code, 200)
 994
 995    def test_patch_role_no_perm(self):
 996        """Assigning a new role without change_role must be rejected."""
 997        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 998        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 999        role = Role.objects.create(name=generate_id())
1000        res = self._patch({"roles": [str(role.pk)]})
1001        self.assertEqual(res.status_code, 400)
1002
1003    def test_patch_role_with_perm(self):
1004        """Assigning a new role with change_role must succeed."""
1005        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
1006        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1007        self.actor.assign_perms_to_managed_role("authentik_rbac.change_role")
1008        role = Role.objects.create(name=generate_id())
1009        res = self._patch({"roles": [str(role.pk)]})
1010        self.assertEqual(res.status_code, 200)
1011
1012    def test_patch_existing_role_no_perm(self):
1013        """Keeping an existing role without change_role must succeed."""
1014        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
1015        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1016        role = Role.objects.create(name=generate_id())
1017        self.target.roles.add(role)
1018        res = self._patch({"roles": [str(role.pk)]})
1019        self.assertEqual(res.status_code, 200)
INVALID_PASSWORD_HASH = 'not-a-valid-hash'
INVALID_PASSWORD_HASH_ERROR = 'Invalid password hash format. Must be a valid Django password hash.'
class TestUsersAPI(rest_framework.test.APITestCase):
 37class TestUsersAPI(APITestCase):
 38    """Test Users API"""
 39
 40    def setUp(self) -> None:
 41        self.admin = create_test_admin_user()
 42        self.user = create_test_user()
 43
 44    def _set_password_hash(self, user: User, password_hash: str, client=None):
 45        return (client or self.client).post(
 46            reverse("authentik_api:user-set-password-hash", kwargs={"pk": user.pk}),
 47            data={"password": password_hash},
 48        )
 49
 50    def _assert_password_hash_set(
 51        self, user: User, password: str, password_hash: str, response
 52    ) -> None:
 53        self.assertEqual(response.status_code, 204, response.data)
 54        user.refresh_from_db()
 55        self.assertEqual(user.password, password_hash)
 56        self.assertTrue(user.check_password(password))
 57
 58    def test_filter_type(self):
 59        """Test API filtering by type"""
 60        self.client.force_login(self.admin)
 61        user = create_test_admin_user(type=UserTypes.EXTERNAL)
 62        response = self.client.get(
 63            reverse("authentik_api:user-list"),
 64            data={
 65                "type": UserTypes.EXTERNAL,
 66                "username": user.username,
 67            },
 68        )
 69        self.assertEqual(response.status_code, 200)
 70
 71    def test_filter_is_superuser(self):
 72        """Test API filtering by superuser status"""
 73        User.objects.all().delete()
 74        admin = create_test_admin_user()
 75        self.client.force_login(admin)
 76        # Test superuser
 77        response = self.client.get(
 78            reverse("authentik_api:user-list"),
 79            data={
 80                "is_superuser": True,
 81            },
 82        )
 83        self.assertEqual(response.status_code, 200)
 84        body = loads(response.content)
 85        self.assertEqual(len(body["results"]), 1)
 86        self.assertEqual(body["results"][0]["username"], admin.username)
 87        # Test non-superuser
 88        user = create_test_user()
 89        response = self.client.get(
 90            reverse("authentik_api:user-list"),
 91            data={
 92                "is_superuser": False,
 93            },
 94        )
 95        self.assertEqual(response.status_code, 200)
 96        body = loads(response.content)
 97        self.assertEqual(len(body["results"]), 1, body)
 98        self.assertEqual(body["results"][0]["username"], user.username)
 99
100    def test_list_with_groups(self):
101        """Test listing with groups"""
102        self.client.force_login(self.admin)
103        response = self.client.get(reverse("authentik_api:user-list"), {"include_groups": "true"})
104        self.assertEqual(response.status_code, 200)
105
106    def test_recovery_no_flow(self):
107        """Test user recovery link (no recovery flow set)"""
108        self.client.force_login(self.admin)
109        response = self.client.post(
110            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk})
111        )
112        self.assertEqual(response.status_code, 400)
113        self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})
114
115    def test_set_password(self):
116        """Test Direct password set"""
117        self.client.force_login(self.admin)
118        new_pw = generate_key()
119        response = self.client.post(
120            reverse("authentik_api:user-set-password", kwargs={"pk": self.admin.pk}),
121            data={"password": new_pw},
122        )
123        self.assertEqual(response.status_code, 204)
124        self.admin.refresh_from_db()
125        self.assertTrue(self.admin.check_password(new_pw))
126
127    def test_set_password_blank(self):
128        """Test Direct password set"""
129        self.client.force_login(self.admin)
130        response = self.client.post(
131            reverse("authentik_api:user-set-password", kwargs={"pk": self.admin.pk}),
132            data={"password": ""},
133        )
134        self.assertEqual(response.status_code, 400)
135        self.assertJSONEqual(response.content, {"password": ["This field may not be blank."]})
136
137    def test_set_password_hash(self):
138        """Test setting a user's password from a hash."""
139        self.client.force_login(self.admin)
140        password = generate_key()
141        password_hash = make_password(password)
142        response = self._set_password_hash(self.user, password_hash)
143
144        self._assert_password_hash_set(self.user, password, password_hash, response)
145
146    def test_set_password_hash_invalid(self):
147        """Test invalid password hashes are rejected."""
148        self.client.force_login(self.admin)
149        response = self._set_password_hash(self.user, INVALID_PASSWORD_HASH)
150
151        self.assertEqual(response.status_code, 400)
152        self.assertJSONEqual(
153            response.content,
154            {"password": [INVALID_PASSWORD_HASH_ERROR]},
155        )
156
157    def test_recovery(self):
158        """Test user recovery link"""
159        flow = create_test_flow(
160            FlowDesignation.RECOVERY,
161            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
162        )
163        brand: Brand = create_test_brand()
164        brand.flow_recovery = flow
165        brand.save()
166        self.client.force_login(self.admin)
167        response = self.client.post(
168            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk})
169        )
170        self.assertEqual(response.status_code, 200)
171
172    def test_recovery_duration(self):
173        """Test user recovery token duration"""
174        Token.objects.all().delete()
175        flow = create_test_flow(
176            FlowDesignation.RECOVERY,
177            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
178        )
179        brand: Brand = create_test_brand()
180        brand.flow_recovery = flow
181        brand.save()
182        self.client.force_login(self.admin)
183        response = self.client.post(
184            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
185            data={"token_duration": "days=33"},
186        )
187        self.assertEqual(response.status_code, 200)
188        expires = Token.objects.first().expires
189        expected_expires = now() + timedelta(days=33)
190        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
191
192    def test_recovery_duration_update(self):
193        """Test user recovery token duration update"""
194        Token.objects.all().delete()
195        flow = create_test_flow(
196            FlowDesignation.RECOVERY,
197            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
198        )
199        brand: Brand = create_test_brand()
200        brand.flow_recovery = flow
201        brand.save()
202        self.client.force_login(self.admin)
203        response = self.client.post(
204            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
205            data={"token_duration": "days=33"},
206        )
207        self.assertEqual(response.status_code, 200)
208        expires = Token.objects.first().expires
209        expected_expires = now() + timedelta(days=33)
210        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
211        response = self.client.post(
212            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
213            data={"token_duration": "days=66"},
214        )
215        expires = Token.objects.first().expires
216        expected_expires = now() + timedelta(days=66)
217        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
218
219    def test_recovery_email_no_flow(self):
220        """Test user recovery link (no recovery flow set)"""
221        self.client.force_login(self.admin)
222        self.user.email = ""
223        self.user.save()
224        stage = EmailStage.objects.create(name="email")
225        response = self.client.post(
226            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
227            data={"email_stage": stage.pk},
228        )
229        self.assertEqual(response.status_code, 400)
230        self.assertJSONEqual(
231            response.content, {"non_field_errors": "User does not have an email address set."}
232        )
233        self.user.email = "foo@bar.baz"
234        self.user.save()
235        response = self.client.post(
236            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
237            data={"email_stage": stage.pk},
238        )
239        self.assertEqual(response.status_code, 400)
240        self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})
241
242    def test_recovery_email_no_stage(self):
243        """Test user recovery link (no email stage)"""
244        self.user.email = "foo@bar.baz"
245        self.user.save()
246        flow = create_test_flow(designation=FlowDesignation.RECOVERY)
247        brand: Brand = create_test_brand()
248        brand.flow_recovery = flow
249        brand.save()
250        self.client.force_login(self.admin)
251        response = self.client.post(
252            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
253        )
254        self.assertEqual(response.status_code, 400)
255        self.assertJSONEqual(response.content, {"email_stage": ["This field is required."]})
256
257    def test_recovery_email(self):
258        """Test user recovery link"""
259        self.user.email = "foo@bar.baz"
260        self.user.save()
261        flow = create_test_flow(FlowDesignation.RECOVERY)
262        brand: Brand = create_test_brand()
263        brand.flow_recovery = flow
264        brand.save()
265
266        stage = EmailStage.objects.create(name="email")
267
268        self.client.force_login(self.admin)
269        response = self.client.post(
270            reverse(
271                "authentik_api:user-recovery-email",
272                kwargs={"pk": self.user.pk},
273            ),
274            data={"email_stage": stage.pk},
275        )
276        self.assertEqual(response.status_code, 204)
277
278    def test_service_account(self):
279        """Service account creation"""
280        self.client.force_login(self.admin)
281        response = self.client.post(reverse("authentik_api:user-service-account"))
282        self.assertEqual(response.status_code, 400)
283        response = self.client.post(
284            reverse("authentik_api:user-service-account"),
285            data={
286                "name": "test-sa",
287                "create_group": True,
288            },
289        )
290        self.assertEqual(response.status_code, 200)
291
292        user_filter = User.objects.filter(
293            username="test-sa",
294            type=UserTypes.SERVICE_ACCOUNT,
295            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
296        )
297        self.assertTrue(user_filter.exists())
298        user: User = user_filter.first()
299        self.assertFalse(user.has_usable_password())
300
301        token_filter = Token.objects.filter(user=user)
302        self.assertTrue(token_filter.exists())
303        self.assertTrue(token_filter.first().expiring)
304
305    def test_service_account_set_password_hash(self):
306        """Service account password hash can be set through the API."""
307        self.client.force_login(self.admin)
308        response = self.client.post(
309            reverse("authentik_api:user-service-account"),
310            data={
311                "name": "test-sa",
312                "create_group": False,
313            },
314        )
315        self.assertEqual(response.status_code, 200, response.data)
316        body = loads(response.content)
317
318        user = User.objects.get(pk=body["user_pk"])
319        self.assertEqual(user.type, UserTypes.SERVICE_ACCOUNT)
320        self.assertFalse(user.has_usable_password())
321
322        password = generate_key()
323        password_hash = make_password(password)
324        response = self._set_password_hash(user, password_hash)
325
326        self._assert_password_hash_set(user, password, password_hash, response)
327
328    def test_service_account_no_expire(self):
329        """Service account creation without token expiration"""
330        self.client.force_login(self.admin)
331        response = self.client.post(
332            reverse("authentik_api:user-service-account"),
333            data={
334                "name": "test-sa",
335                "create_group": True,
336                "expiring": False,
337            },
338        )
339        self.assertEqual(response.status_code, 200)
340
341        user_filter = User.objects.filter(
342            username="test-sa",
343            type=UserTypes.SERVICE_ACCOUNT,
344            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: False},
345        )
346        self.assertTrue(user_filter.exists())
347        user: User = user_filter.first()
348        self.assertFalse(user.has_usable_password())
349
350        token_filter = Token.objects.filter(user=user)
351        self.assertTrue(token_filter.exists())
352        self.assertFalse(token_filter.first().expiring)
353
354    def test_service_account_with_custom_expire(self):
355        """Service account creation with custom token expiration date"""
356        self.client.force_login(self.admin)
357        expire_on = datetime(2050, 11, 11, 11, 11, 11).astimezone()
358        response = self.client.post(
359            reverse("authentik_api:user-service-account"),
360            data={
361                "name": "test-sa",
362                "create_group": True,
363                "expires": expire_on.isoformat(),
364            },
365        )
366        self.assertEqual(response.status_code, 200)
367
368        user_filter = User.objects.filter(
369            username="test-sa",
370            type=UserTypes.SERVICE_ACCOUNT,
371            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
372        )
373        self.assertTrue(user_filter.exists())
374        user: User = user_filter.first()
375        self.assertFalse(user.has_usable_password())
376
377        token_filter = Token.objects.filter(user=user)
378        self.assertTrue(token_filter.exists())
379        token = token_filter.first()
380        self.assertTrue(token.expiring)
381        self.assertEqual(token.expires, expire_on)
382
383    def test_service_account_invalid(self):
384        """Service account creation (twice with same name, expect error)"""
385        self.client.force_login(self.admin)
386        response = self.client.post(
387            reverse("authentik_api:user-service-account"),
388            data={
389                "name": "test-sa",
390                "create_group": True,
391            },
392        )
393        self.assertEqual(response.status_code, 200)
394
395        user_filter = User.objects.filter(
396            username="test-sa",
397            type=UserTypes.SERVICE_ACCOUNT,
398            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
399        )
400        self.assertTrue(user_filter.exists())
401        user: User = user_filter.first()
402        self.assertFalse(user.has_usable_password())
403
404        token_filter = Token.objects.filter(user=user)
405        self.assertTrue(token_filter.exists())
406        self.assertTrue(token_filter.first().expiring)
407
408        response = self.client.post(
409            reverse("authentik_api:user-service-account"),
410            data={
411                "name": "test-sa",
412                "create_group": True,
413            },
414        )
415        self.assertEqual(response.status_code, 400)
416
417    def test_paths(self):
418        """Test path"""
419        self.client.force_login(self.admin)
420        response = self.client.get(
421            reverse("authentik_api:user-paths"),
422        )
423        self.assertEqual(response.status_code, 200)
424        expected = list(
425            User.objects.all()
426            .values("path")
427            .distinct()
428            .order_by("path")
429            .values_list("path", flat=True)
430        )
431        self.assertJSONEqual(response.content.decode(), {"paths": expected})
432
433    def test_path_valid(self):
434        """Test path"""
435        self.client.force_login(self.admin)
436        response = self.client.post(
437            reverse("authentik_api:user-list"),
438            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "foo"},
439        )
440        self.assertEqual(response.status_code, 201)
441
442    def test_path_invalid(self):
443        """Test path (invalid)"""
444        self.client.force_login(self.admin)
445        response = self.client.post(
446            reverse("authentik_api:user-list"),
447            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "/foo"},
448        )
449        self.assertEqual(response.status_code, 400)
450        self.assertJSONEqual(
451            response.content.decode(), {"path": ["No leading or trailing slashes allowed."]}
452        )
453
454        self.client.force_login(self.admin)
455        response = self.client.post(
456            reverse("authentik_api:user-list"),
457            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": ""},
458        )
459        self.assertEqual(response.status_code, 400)
460        self.assertJSONEqual(response.content.decode(), {"path": ["This field may not be blank."]})
461
462        response = self.client.post(
463            reverse("authentik_api:user-list"),
464            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "foo/"},
465        )
466        self.assertEqual(response.status_code, 400)
467        self.assertJSONEqual(
468            response.content.decode(), {"path": ["No leading or trailing slashes allowed."]}
469        )
470
471        response = self.client.post(
472            reverse("authentik_api:user-list"),
473            data={
474                "name": generate_id(),
475                "username": generate_id(),
476                "groups": [],
477                "path": "fos//o",
478            },
479        )
480        self.assertEqual(response.status_code, 400)
481        self.assertJSONEqual(
482            response.content.decode(), {"path": ["No empty segments in user path allowed."]}
483        )
484
485    def test_me(self):
486        """Test user's me endpoint"""
487        self.client.force_login(self.admin)
488        response = self.client.get(reverse("authentik_api:user-me"))
489        self.assertEqual(response.status_code, 200)
490
491    def test_session_delete(self):
492        """Ensure sessions are deleted when a user is deactivated"""
493        user = create_test_admin_user()
494        session_id = generate_id()
495        session = Session.objects.create(
496            session_key=session_id,
497            last_ip="255.255.255.255",
498            last_user_agent="",
499        )
500        AuthenticatedSession.objects.create(
501            session=session,
502            user=user,
503        )
504
505        self.client.force_login(self.admin)
506        response = self.client.patch(
507            reverse("authentik_api:user-detail", kwargs={"pk": user.pk}),
508            data={
509                "is_active": False,
510            },
511        )
512        self.assertEqual(response.status_code, 200)
513
514        self.assertFalse(Session.objects.filter(session_key=session_id).exists())
515        self.assertFalse(
516            AuthenticatedSession.objects.filter(session__session_key=session_id).exists()
517        )
518
519    def test_sort_by_last_updated(self):
520        """Test API sorting by last_updated"""
521        User.objects.all().delete()
522        admin = create_test_admin_user()
523        self.client.force_login(admin)
524
525        user = create_test_user()
526        admin.first_name = "Sample change"
527        admin.last_name = "To trigger an update"
528        admin.save()
529
530        # Ascending
531        response = self.client.get(
532            reverse("authentik_api:user-list"),
533            data={
534                "ordering": "last_updated",
535            },
536        )
537        self.assertEqual(response.status_code, 200)
538
539        body = loads(response.content)
540        self.assertEqual(len(body["results"]), 2)
541        self.assertEqual(body["results"][0]["pk"], user.pk)
542
543        # Descending
544        response = self.client.get(
545            reverse("authentik_api:user-list"),
546            data={
547                "ordering": "-last_updated",
548            },
549        )
550        self.assertEqual(response.status_code, 200)
551
552        body = loads(response.content)
553        self.assertEqual(len(body["results"]), 2)
554        self.assertEqual(body["results"][0]["pk"], admin.pk)
555
556    def test_sort_by_date_joined(self):
557        """Test API sorting by date_joined"""
558        User.objects.all().delete()
559        admin = create_test_admin_user()
560        self.client.force_login(admin)
561
562        user = create_test_user()
563
564        response = self.client.get(
565            reverse("authentik_api:user-list"),
566            data={
567                "ordering": "date_joined",
568            },
569        )
570        self.assertEqual(response.status_code, 200)
571
572        body = loads(response.content)
573        self.assertEqual(len(body["results"]), 2)
574        self.assertEqual(body["results"][0]["pk"], admin.pk)
575
576        response = self.client.get(
577            reverse("authentik_api:user-list"),
578            data={
579                "ordering": "-date_joined",
580            },
581        )
582        self.assertEqual(response.status_code, 200)
583
584        body = loads(response.content)
585        self.assertEqual(len(body["results"]), 2)
586        self.assertEqual(body["results"][0]["pk"], user.pk)
587
588    def test_service_account_validation_empty_username(self):
589        """Test service account creation with empty/blank username validation"""
590        self.client.force_login(self.admin)
591
592        # Test with empty string
593        response = self.client.post(
594            reverse("authentik_api:user-service-account"),
595            data={
596                "name": "",
597                "create_group": True,
598            },
599        )
600        self.assertEqual(response.status_code, 400)
601        self.assertJSONEqual(
602            response.content,
603            {"name": ["This field may not be blank."]},
604        )
605
606        # Test with only whitespace
607        response = self.client.post(
608            reverse("authentik_api:user-service-account"),
609            data={
610                "name": "   ",
611                "create_group": True,
612            },
613        )
614        self.assertEqual(response.status_code, 400)
615        self.assertJSONEqual(
616            response.content,
617            {"name": ["This field may not be blank."]},
618        )
619
620        # Test with tab and newline characters
621        response = self.client.post(
622            reverse("authentik_api:user-service-account"),
623            data={
624                "name": "\t\n",
625                "create_group": True,
626            },
627        )
628        self.assertEqual(response.status_code, 400)
629        self.assertJSONEqual(
630            response.content,
631            {"name": ["This field may not be blank."]},
632        )
633
634    def test_service_account_validation_valid_username(self):
635        """Test service account creation with valid username"""
636        self.client.force_login(self.admin)
637
638        # Test with valid username
639        response = self.client.post(
640            reverse("authentik_api:user-service-account"),
641            data={
642                "name": "valid-service-account",
643                "create_group": True,
644            },
645        )
646        self.assertEqual(response.status_code, 200)
647
648        # Verify response structure
649        body = loads(response.content)
650        self.assertIn("username", body)
651        self.assertIn("user_uid", body)
652        self.assertIn("user_pk", body)
653        self.assertIn("group_pk", body)  # Should exist since create_group=True
654        self.assertIn("token", body)
655
656        # Verify field types
657        self.assertEqual(body["username"], "valid-service-account")
658        self.assertIsInstance(body["user_pk"], int)
659        self.assertIsInstance(body["user_uid"], str)
660        self.assertIsInstance(body["token"], str)
661        self.assertIsInstance(body["group_pk"], str)
662
663    def test_service_account_validation_without_group(self):
664        """Test service account creation without creating a group"""
665        self.client.force_login(self.admin)
666
667        response = self.client.post(
668            reverse("authentik_api:user-service-account"),
669            data={
670                "name": "no-group-service-account",
671                "create_group": False,
672            },
673        )
674        self.assertEqual(response.status_code, 200)
675
676        body = loads(response.content)
677        self.assertIn("username", body)
678        self.assertIn("user_uid", body)
679        self.assertIn("user_pk", body)
680        self.assertIn("token", body)
681        # Should NOT have group_pk when create_group=False
682        self.assertNotIn("group_pk", body)
683
684    def test_service_account_validation_duplicate_username(self):
685        """Test service account creation with duplicate username"""
686        self.client.force_login(self.admin)
687
688        # Create first service account
689        response = self.client.post(
690            reverse("authentik_api:user-service-account"),
691            data={
692                "name": "duplicate-test",
693                "create_group": True,
694            },
695        )
696        self.assertEqual(response.status_code, 200)
697
698        # Attempt to create second with same username
699        response = self.client.post(
700            reverse("authentik_api:user-service-account"),
701            data={
702                "name": "duplicate-test",
703                "create_group": True,
704            },
705        )
706        self.assertEqual(response.status_code, 400)
707        self.assertJSONEqual(
708            response.content,
709            {"name": ["This field must be unique."]},
710        )
711
712    def test_service_account_validation_invalid_create_group(self):
713        """Test service account creation with invalid create_group field"""
714        self.client.force_login(self.admin)
715
716        # Test with string instead of boolean
717        response = self.client.post(
718            reverse("authentik_api:user-service-account"),
719            data={
720                "name": "test-sa",
721                "create_group": "invalid",
722            },
723        )
724        self.assertEqual(response.status_code, 400)
725        self.assertJSONEqual(
726            response.content,
727            {"create_group": ["Must be a valid boolean."]},
728        )
729
730        # Test with number instead of boolean
731        response = self.client.post(
732            reverse("authentik_api:user-service-account"),
733            data={
734                "name": "test-sa",
735                "create_group": 123,
736            },
737        )
738        self.assertEqual(response.status_code, 400)
739        self.assertJSONEqual(
740            response.content,
741            {"create_group": ["Must be a valid boolean."]},
742        )
743
744    def test_service_account_validation_invalid_expiring(self):
745        """Test service account creation with invalid expiring field"""
746        self.client.force_login(self.admin)
747
748        # Test with string instead of boolean
749        response = self.client.post(
750            reverse("authentik_api:user-service-account"),
751            data={
752                "name": "test-sa",
753                "expiring": "invalid",
754            },
755        )
756        self.assertEqual(response.status_code, 400)
757        self.assertJSONEqual(
758            response.content,
759            {"expiring": ["Must be a valid boolean."]},
760        )
761
762    def test_service_account_validation_invalid_expires(self):
763        """Test service account creation with invalid expires field"""
764        self.client.force_login(self.admin)
765
766        # Test with invalid datetime string
767        response = self.client.post(
768            reverse("authentik_api:user-service-account"),
769            data={
770                "name": "test-sa",
771                "expires": "invalid-datetime",
772            },
773        )
774        self.assertEqual(response.status_code, 400)
775        self.assertJSONEqual(
776            response.content,
777            {
778                "expires": [
779                    "Datetime has wrong format. Use one of these formats instead: "
780                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
781                ]
782            },
783        )
784
785        # Test with invalid format
786        response = self.client.post(
787            reverse("authentik_api:user-service-account"),
788            data={
789                "name": "test-sa",
790                "expires": "2024-13-45",  # Invalid month/day
791            },
792        )
793        self.assertEqual(response.status_code, 400)
794        self.assertJSONEqual(
795            response.content,
796            {
797                "expires": [
798                    "Datetime has wrong format. Use one of these formats instead: "
799                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
800                ]
801            },
802        )
803
804    def test_service_account_validation_multiple_errors(self):
805        """Test service account creation with multiple validation errors"""
806        self.client.force_login(self.admin)
807
808        response = self.client.post(
809            reverse("authentik_api:user-service-account"),
810            data={
811                "name": "",  # Empty username
812                "create_group": "invalid",  # Invalid boolean
813                "expiring": 123,  # Invalid boolean
814                "expires": "not-a-date",  # Invalid datetime
815            },
816        )
817        self.assertEqual(response.status_code, 400)
818        self.assertJSONEqual(
819            response.content,
820            {
821                "name": ["This field may not be blank."],
822                "create_group": ["Must be a valid boolean."],
823                "expiring": ["Must be a valid boolean."],
824                "expires": [
825                    "Datetime has wrong format. Use one of these formats instead: "
826                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
827                ],
828            },
829        )
830
831    def test_service_account_validation_user_friendly_duplicate_error(self):
832        """Test that duplicate username returns user-friendly error, not database error"""
833        self.client.force_login(self.admin)
834
835        # Create first service account
836        response = self.client.post(
837            reverse("authentik_api:user-service-account"),
838            data={
839                "name": "duplicate-username-test",
840                "create_group": True,
841            },
842        )
843        self.assertEqual(response.status_code, 200)
844
845        # Attempt to create second with same username
846        response = self.client.post(
847            reverse("authentik_api:user-service-account"),
848            data={
849                "name": "duplicate-username-test",
850                "create_group": True,
851            },
852        )
853        self.assertEqual(response.status_code, 400)
854        self.assertJSONEqual(
855            response.content,
856            {"name": ["This field must be unique."]},
857        )
858
859    def test_filter_last_login(self):
860        """Test API filtering by last_login"""
861        from datetime import timedelta
862
863        from django.utils import timezone
864
865        User.objects.all().delete()
866        admin = create_test_admin_user()
867        self.client.force_login(admin)
868
869        # Create users with different last_login values
870        user_recent = create_test_user()
871        user_recent.last_login = timezone.now()
872        user_recent.save()
873
874        user_old = create_test_user()
875        user_old.last_login = timezone.now() - timedelta(days=400)  # Over 1 year ago
876        user_old.save()
877
878        user_never = create_test_user()
879        user_never.last_login = None  # Never logged in
880        user_never.save()
881
882        # Filter users who logged in before 1 year ago
883        one_year_ago = (timezone.now() - timedelta(days=365)).isoformat()
884        response = self.client.get(
885            reverse("authentik_api:user-list"),
886            data={"last_login__lt": one_year_ago},
887        )
888        self.assertEqual(response.status_code, 200)
889        body = loads(response.content)
890        self.assertEqual(len(body["results"]), 1)
891        self.assertEqual(body["results"][0]["pk"], user_old.pk)
892
893        # Filter users who have never logged in
894        response = self.client.get(
895            reverse("authentik_api:user-list"),
896            data={"last_login__isnull": True},
897        )
898        self.assertEqual(response.status_code, 200)
899        body = loads(response.content)
900        # Should include user_never and admin (who hasn't logged in via the app)
901        pks = [r["pk"] for r in body["results"]]
902        self.assertIn(user_never.pk, pks)
903
904    def test_sort_by_last_login(self):
905        """Test API sorting by last_login"""
906        from datetime import timedelta
907
908        from django.utils import timezone
909
910        User.objects.all().delete()
911        admin = create_test_admin_user()
912        self.client.force_login(admin)
913
914        user1 = create_test_user()
915        user1.last_login = timezone.now() - timedelta(days=10)
916        user1.save()
917
918        user2 = create_test_user()
919        user2.last_login = timezone.now() - timedelta(days=5)
920        user2.save()
921
922        # Ascending order (oldest first)
923        response = self.client.get(
924            reverse("authentik_api:user-list"),
925            data={"ordering": "last_login"},
926        )
927        self.assertEqual(response.status_code, 200)
928        body = loads(response.content)
929        # Users with null last_login come first, then user1 (older), then user2 (newer)
930        self.assertEqual(len(body["results"]), 3)
931
932        # Descending order (newest first)
933        response = self.client.get(
934            reverse("authentik_api:user-list"),
935            data={"ordering": "-last_login"},
936        )
937        self.assertEqual(response.status_code, 200)
938        body = loads(response.content)
939        # user2 should come before user1 (more recent login)
940        pks = [r["pk"] for r in body["results"]]
941        self.assertIn(user1.pk, pks)
942        self.assertIn(user2.pk, pks)
943        # Verify user2 comes before user1 in descending order
944        self.assertLess(pks.index(user2.pk), pks.index(user1.pk))

Test Users API

def setUp(self) -> None:
40    def setUp(self) -> None:
41        self.admin = create_test_admin_user()
42        self.user = create_test_user()

Hook method for setting up the test fixture before exercising it.

def test_filter_type(self):
58    def test_filter_type(self):
59        """Test API filtering by type"""
60        self.client.force_login(self.admin)
61        user = create_test_admin_user(type=UserTypes.EXTERNAL)
62        response = self.client.get(
63            reverse("authentik_api:user-list"),
64            data={
65                "type": UserTypes.EXTERNAL,
66                "username": user.username,
67            },
68        )
69        self.assertEqual(response.status_code, 200)

Test API filtering by type

def test_filter_is_superuser(self):
71    def test_filter_is_superuser(self):
72        """Test API filtering by superuser status"""
73        User.objects.all().delete()
74        admin = create_test_admin_user()
75        self.client.force_login(admin)
76        # Test superuser
77        response = self.client.get(
78            reverse("authentik_api:user-list"),
79            data={
80                "is_superuser": True,
81            },
82        )
83        self.assertEqual(response.status_code, 200)
84        body = loads(response.content)
85        self.assertEqual(len(body["results"]), 1)
86        self.assertEqual(body["results"][0]["username"], admin.username)
87        # Test non-superuser
88        user = create_test_user()
89        response = self.client.get(
90            reverse("authentik_api:user-list"),
91            data={
92                "is_superuser": False,
93            },
94        )
95        self.assertEqual(response.status_code, 200)
96        body = loads(response.content)
97        self.assertEqual(len(body["results"]), 1, body)
98        self.assertEqual(body["results"][0]["username"], user.username)

Test API filtering by superuser status

def test_list_with_groups(self):
100    def test_list_with_groups(self):
101        """Test listing with groups"""
102        self.client.force_login(self.admin)
103        response = self.client.get(reverse("authentik_api:user-list"), {"include_groups": "true"})
104        self.assertEqual(response.status_code, 200)

Test listing with groups

def test_recovery_no_flow(self):
106    def test_recovery_no_flow(self):
107        """Test user recovery link (no recovery flow set)"""
108        self.client.force_login(self.admin)
109        response = self.client.post(
110            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk})
111        )
112        self.assertEqual(response.status_code, 400)
113        self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})

Test user recovery link (no recovery flow set)

def test_set_password(self):
115    def test_set_password(self):
116        """Test Direct password set"""
117        self.client.force_login(self.admin)
118        new_pw = generate_key()
119        response = self.client.post(
120            reverse("authentik_api:user-set-password", kwargs={"pk": self.admin.pk}),
121            data={"password": new_pw},
122        )
123        self.assertEqual(response.status_code, 204)
124        self.admin.refresh_from_db()
125        self.assertTrue(self.admin.check_password(new_pw))

Test Direct password set

def test_set_password_blank(self):
127    def test_set_password_blank(self):
128        """Test Direct password set"""
129        self.client.force_login(self.admin)
130        response = self.client.post(
131            reverse("authentik_api:user-set-password", kwargs={"pk": self.admin.pk}),
132            data={"password": ""},
133        )
134        self.assertEqual(response.status_code, 400)
135        self.assertJSONEqual(response.content, {"password": ["This field may not be blank."]})

Test Direct password set

def test_set_password_hash(self):
137    def test_set_password_hash(self):
138        """Test setting a user's password from a hash."""
139        self.client.force_login(self.admin)
140        password = generate_key()
141        password_hash = make_password(password)
142        response = self._set_password_hash(self.user, password_hash)
143
144        self._assert_password_hash_set(self.user, password, password_hash, response)

Test setting a user's password from a hash.

def test_set_password_hash_invalid(self):
146    def test_set_password_hash_invalid(self):
147        """Test invalid password hashes are rejected."""
148        self.client.force_login(self.admin)
149        response = self._set_password_hash(self.user, INVALID_PASSWORD_HASH)
150
151        self.assertEqual(response.status_code, 400)
152        self.assertJSONEqual(
153            response.content,
154            {"password": [INVALID_PASSWORD_HASH_ERROR]},
155        )

Test invalid password hashes are rejected.

def test_recovery(self):
157    def test_recovery(self):
158        """Test user recovery link"""
159        flow = create_test_flow(
160            FlowDesignation.RECOVERY,
161            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
162        )
163        brand: Brand = create_test_brand()
164        brand.flow_recovery = flow
165        brand.save()
166        self.client.force_login(self.admin)
167        response = self.client.post(
168            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk})
169        )
170        self.assertEqual(response.status_code, 200)

Test user recovery link

def test_recovery_duration(self):
172    def test_recovery_duration(self):
173        """Test user recovery token duration"""
174        Token.objects.all().delete()
175        flow = create_test_flow(
176            FlowDesignation.RECOVERY,
177            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
178        )
179        brand: Brand = create_test_brand()
180        brand.flow_recovery = flow
181        brand.save()
182        self.client.force_login(self.admin)
183        response = self.client.post(
184            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
185            data={"token_duration": "days=33"},
186        )
187        self.assertEqual(response.status_code, 200)
188        expires = Token.objects.first().expires
189        expected_expires = now() + timedelta(days=33)
190        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))

Test user recovery token duration

def test_recovery_duration_update(self):
192    def test_recovery_duration_update(self):
193        """Test user recovery token duration update"""
194        Token.objects.all().delete()
195        flow = create_test_flow(
196            FlowDesignation.RECOVERY,
197            authentication=FlowAuthenticationRequirement.REQUIRE_UNAUTHENTICATED,
198        )
199        brand: Brand = create_test_brand()
200        brand.flow_recovery = flow
201        brand.save()
202        self.client.force_login(self.admin)
203        response = self.client.post(
204            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
205            data={"token_duration": "days=33"},
206        )
207        self.assertEqual(response.status_code, 200)
208        expires = Token.objects.first().expires
209        expected_expires = now() + timedelta(days=33)
210        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))
211        response = self.client.post(
212            reverse("authentik_api:user-recovery", kwargs={"pk": self.user.pk}),
213            data={"token_duration": "days=66"},
214        )
215        expires = Token.objects.first().expires
216        expected_expires = now() + timedelta(days=66)
217        self.assertTrue(timedelta(minutes=-1) < expected_expires - expires < timedelta(minutes=1))

Test user recovery token duration update

def test_recovery_email_no_flow(self):
219    def test_recovery_email_no_flow(self):
220        """Test user recovery link (no recovery flow set)"""
221        self.client.force_login(self.admin)
222        self.user.email = ""
223        self.user.save()
224        stage = EmailStage.objects.create(name="email")
225        response = self.client.post(
226            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
227            data={"email_stage": stage.pk},
228        )
229        self.assertEqual(response.status_code, 400)
230        self.assertJSONEqual(
231            response.content, {"non_field_errors": "User does not have an email address set."}
232        )
233        self.user.email = "foo@bar.baz"
234        self.user.save()
235        response = self.client.post(
236            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk}),
237            data={"email_stage": stage.pk},
238        )
239        self.assertEqual(response.status_code, 400)
240        self.assertJSONEqual(response.content, {"non_field_errors": "No recovery flow set."})

Test user recovery link (no recovery flow set)

def test_recovery_email_no_stage(self):
242    def test_recovery_email_no_stage(self):
243        """Test user recovery link (no email stage)"""
244        self.user.email = "foo@bar.baz"
245        self.user.save()
246        flow = create_test_flow(designation=FlowDesignation.RECOVERY)
247        brand: Brand = create_test_brand()
248        brand.flow_recovery = flow
249        brand.save()
250        self.client.force_login(self.admin)
251        response = self.client.post(
252            reverse("authentik_api:user-recovery-email", kwargs={"pk": self.user.pk})
253        )
254        self.assertEqual(response.status_code, 400)
255        self.assertJSONEqual(response.content, {"email_stage": ["This field is required."]})

Test user recovery link (no email stage)

def test_recovery_email(self):
257    def test_recovery_email(self):
258        """Test user recovery link"""
259        self.user.email = "foo@bar.baz"
260        self.user.save()
261        flow = create_test_flow(FlowDesignation.RECOVERY)
262        brand: Brand = create_test_brand()
263        brand.flow_recovery = flow
264        brand.save()
265
266        stage = EmailStage.objects.create(name="email")
267
268        self.client.force_login(self.admin)
269        response = self.client.post(
270            reverse(
271                "authentik_api:user-recovery-email",
272                kwargs={"pk": self.user.pk},
273            ),
274            data={"email_stage": stage.pk},
275        )
276        self.assertEqual(response.status_code, 204)

Test user recovery link

def test_service_account(self):
278    def test_service_account(self):
279        """Service account creation"""
280        self.client.force_login(self.admin)
281        response = self.client.post(reverse("authentik_api:user-service-account"))
282        self.assertEqual(response.status_code, 400)
283        response = self.client.post(
284            reverse("authentik_api:user-service-account"),
285            data={
286                "name": "test-sa",
287                "create_group": True,
288            },
289        )
290        self.assertEqual(response.status_code, 200)
291
292        user_filter = User.objects.filter(
293            username="test-sa",
294            type=UserTypes.SERVICE_ACCOUNT,
295            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
296        )
297        self.assertTrue(user_filter.exists())
298        user: User = user_filter.first()
299        self.assertFalse(user.has_usable_password())
300
301        token_filter = Token.objects.filter(user=user)
302        self.assertTrue(token_filter.exists())
303        self.assertTrue(token_filter.first().expiring)

Service account creation

def test_service_account_set_password_hash(self):
305    def test_service_account_set_password_hash(self):
306        """Service account password hash can be set through the API."""
307        self.client.force_login(self.admin)
308        response = self.client.post(
309            reverse("authentik_api:user-service-account"),
310            data={
311                "name": "test-sa",
312                "create_group": False,
313            },
314        )
315        self.assertEqual(response.status_code, 200, response.data)
316        body = loads(response.content)
317
318        user = User.objects.get(pk=body["user_pk"])
319        self.assertEqual(user.type, UserTypes.SERVICE_ACCOUNT)
320        self.assertFalse(user.has_usable_password())
321
322        password = generate_key()
323        password_hash = make_password(password)
324        response = self._set_password_hash(user, password_hash)
325
326        self._assert_password_hash_set(user, password, password_hash, response)

Service account password hash can be set through the API.

def test_service_account_no_expire(self):
328    def test_service_account_no_expire(self):
329        """Service account creation without token expiration"""
330        self.client.force_login(self.admin)
331        response = self.client.post(
332            reverse("authentik_api:user-service-account"),
333            data={
334                "name": "test-sa",
335                "create_group": True,
336                "expiring": False,
337            },
338        )
339        self.assertEqual(response.status_code, 200)
340
341        user_filter = User.objects.filter(
342            username="test-sa",
343            type=UserTypes.SERVICE_ACCOUNT,
344            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: False},
345        )
346        self.assertTrue(user_filter.exists())
347        user: User = user_filter.first()
348        self.assertFalse(user.has_usable_password())
349
350        token_filter = Token.objects.filter(user=user)
351        self.assertTrue(token_filter.exists())
352        self.assertFalse(token_filter.first().expiring)

Service account creation without token expiration

def test_service_account_with_custom_expire(self):
354    def test_service_account_with_custom_expire(self):
355        """Service account creation with custom token expiration date"""
356        self.client.force_login(self.admin)
357        expire_on = datetime(2050, 11, 11, 11, 11, 11).astimezone()
358        response = self.client.post(
359            reverse("authentik_api:user-service-account"),
360            data={
361                "name": "test-sa",
362                "create_group": True,
363                "expires": expire_on.isoformat(),
364            },
365        )
366        self.assertEqual(response.status_code, 200)
367
368        user_filter = User.objects.filter(
369            username="test-sa",
370            type=UserTypes.SERVICE_ACCOUNT,
371            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
372        )
373        self.assertTrue(user_filter.exists())
374        user: User = user_filter.first()
375        self.assertFalse(user.has_usable_password())
376
377        token_filter = Token.objects.filter(user=user)
378        self.assertTrue(token_filter.exists())
379        token = token_filter.first()
380        self.assertTrue(token.expiring)
381        self.assertEqual(token.expires, expire_on)

Service account creation with custom token expiration date

def test_service_account_invalid(self):
383    def test_service_account_invalid(self):
384        """Service account creation (twice with same name, expect error)"""
385        self.client.force_login(self.admin)
386        response = self.client.post(
387            reverse("authentik_api:user-service-account"),
388            data={
389                "name": "test-sa",
390                "create_group": True,
391            },
392        )
393        self.assertEqual(response.status_code, 200)
394
395        user_filter = User.objects.filter(
396            username="test-sa",
397            type=UserTypes.SERVICE_ACCOUNT,
398            attributes={USER_ATTRIBUTE_TOKEN_EXPIRING: True},
399        )
400        self.assertTrue(user_filter.exists())
401        user: User = user_filter.first()
402        self.assertFalse(user.has_usable_password())
403
404        token_filter = Token.objects.filter(user=user)
405        self.assertTrue(token_filter.exists())
406        self.assertTrue(token_filter.first().expiring)
407
408        response = self.client.post(
409            reverse("authentik_api:user-service-account"),
410            data={
411                "name": "test-sa",
412                "create_group": True,
413            },
414        )
415        self.assertEqual(response.status_code, 400)

Service account creation (twice with same name, expect error)

def test_paths(self):
417    def test_paths(self):
418        """Test path"""
419        self.client.force_login(self.admin)
420        response = self.client.get(
421            reverse("authentik_api:user-paths"),
422        )
423        self.assertEqual(response.status_code, 200)
424        expected = list(
425            User.objects.all()
426            .values("path")
427            .distinct()
428            .order_by("path")
429            .values_list("path", flat=True)
430        )
431        self.assertJSONEqual(response.content.decode(), {"paths": expected})

Test path

def test_path_valid(self):
433    def test_path_valid(self):
434        """Test path"""
435        self.client.force_login(self.admin)
436        response = self.client.post(
437            reverse("authentik_api:user-list"),
438            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "foo"},
439        )
440        self.assertEqual(response.status_code, 201)

Test path

def test_path_invalid(self):
442    def test_path_invalid(self):
443        """Test path (invalid)"""
444        self.client.force_login(self.admin)
445        response = self.client.post(
446            reverse("authentik_api:user-list"),
447            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "/foo"},
448        )
449        self.assertEqual(response.status_code, 400)
450        self.assertJSONEqual(
451            response.content.decode(), {"path": ["No leading or trailing slashes allowed."]}
452        )
453
454        self.client.force_login(self.admin)
455        response = self.client.post(
456            reverse("authentik_api:user-list"),
457            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": ""},
458        )
459        self.assertEqual(response.status_code, 400)
460        self.assertJSONEqual(response.content.decode(), {"path": ["This field may not be blank."]})
461
462        response = self.client.post(
463            reverse("authentik_api:user-list"),
464            data={"name": generate_id(), "username": generate_id(), "groups": [], "path": "foo/"},
465        )
466        self.assertEqual(response.status_code, 400)
467        self.assertJSONEqual(
468            response.content.decode(), {"path": ["No leading or trailing slashes allowed."]}
469        )
470
471        response = self.client.post(
472            reverse("authentik_api:user-list"),
473            data={
474                "name": generate_id(),
475                "username": generate_id(),
476                "groups": [],
477                "path": "fos//o",
478            },
479        )
480        self.assertEqual(response.status_code, 400)
481        self.assertJSONEqual(
482            response.content.decode(), {"path": ["No empty segments in user path allowed."]}
483        )

Test path (invalid)

def test_me(self):
485    def test_me(self):
486        """Test user's me endpoint"""
487        self.client.force_login(self.admin)
488        response = self.client.get(reverse("authentik_api:user-me"))
489        self.assertEqual(response.status_code, 200)

Test user's me endpoint

def test_session_delete(self):
491    def test_session_delete(self):
492        """Ensure sessions are deleted when a user is deactivated"""
493        user = create_test_admin_user()
494        session_id = generate_id()
495        session = Session.objects.create(
496            session_key=session_id,
497            last_ip="255.255.255.255",
498            last_user_agent="",
499        )
500        AuthenticatedSession.objects.create(
501            session=session,
502            user=user,
503        )
504
505        self.client.force_login(self.admin)
506        response = self.client.patch(
507            reverse("authentik_api:user-detail", kwargs={"pk": user.pk}),
508            data={
509                "is_active": False,
510            },
511        )
512        self.assertEqual(response.status_code, 200)
513
514        self.assertFalse(Session.objects.filter(session_key=session_id).exists())
515        self.assertFalse(
516            AuthenticatedSession.objects.filter(session__session_key=session_id).exists()
517        )

Ensure sessions are deleted when a user is deactivated

def test_sort_by_last_updated(self):
519    def test_sort_by_last_updated(self):
520        """Test API sorting by last_updated"""
521        User.objects.all().delete()
522        admin = create_test_admin_user()
523        self.client.force_login(admin)
524
525        user = create_test_user()
526        admin.first_name = "Sample change"
527        admin.last_name = "To trigger an update"
528        admin.save()
529
530        # Ascending
531        response = self.client.get(
532            reverse("authentik_api:user-list"),
533            data={
534                "ordering": "last_updated",
535            },
536        )
537        self.assertEqual(response.status_code, 200)
538
539        body = loads(response.content)
540        self.assertEqual(len(body["results"]), 2)
541        self.assertEqual(body["results"][0]["pk"], user.pk)
542
543        # Descending
544        response = self.client.get(
545            reverse("authentik_api:user-list"),
546            data={
547                "ordering": "-last_updated",
548            },
549        )
550        self.assertEqual(response.status_code, 200)
551
552        body = loads(response.content)
553        self.assertEqual(len(body["results"]), 2)
554        self.assertEqual(body["results"][0]["pk"], admin.pk)

Test API sorting by last_updated

def test_sort_by_date_joined(self):
556    def test_sort_by_date_joined(self):
557        """Test API sorting by date_joined"""
558        User.objects.all().delete()
559        admin = create_test_admin_user()
560        self.client.force_login(admin)
561
562        user = create_test_user()
563
564        response = self.client.get(
565            reverse("authentik_api:user-list"),
566            data={
567                "ordering": "date_joined",
568            },
569        )
570        self.assertEqual(response.status_code, 200)
571
572        body = loads(response.content)
573        self.assertEqual(len(body["results"]), 2)
574        self.assertEqual(body["results"][0]["pk"], admin.pk)
575
576        response = self.client.get(
577            reverse("authentik_api:user-list"),
578            data={
579                "ordering": "-date_joined",
580            },
581        )
582        self.assertEqual(response.status_code, 200)
583
584        body = loads(response.content)
585        self.assertEqual(len(body["results"]), 2)
586        self.assertEqual(body["results"][0]["pk"], user.pk)

Test API sorting by date_joined

def test_service_account_validation_empty_username(self):
588    def test_service_account_validation_empty_username(self):
589        """Test service account creation with empty/blank username validation"""
590        self.client.force_login(self.admin)
591
592        # Test with empty string
593        response = self.client.post(
594            reverse("authentik_api:user-service-account"),
595            data={
596                "name": "",
597                "create_group": True,
598            },
599        )
600        self.assertEqual(response.status_code, 400)
601        self.assertJSONEqual(
602            response.content,
603            {"name": ["This field may not be blank."]},
604        )
605
606        # Test with only whitespace
607        response = self.client.post(
608            reverse("authentik_api:user-service-account"),
609            data={
610                "name": "   ",
611                "create_group": True,
612            },
613        )
614        self.assertEqual(response.status_code, 400)
615        self.assertJSONEqual(
616            response.content,
617            {"name": ["This field may not be blank."]},
618        )
619
620        # Test with tab and newline characters
621        response = self.client.post(
622            reverse("authentik_api:user-service-account"),
623            data={
624                "name": "\t\n",
625                "create_group": True,
626            },
627        )
628        self.assertEqual(response.status_code, 400)
629        self.assertJSONEqual(
630            response.content,
631            {"name": ["This field may not be blank."]},
632        )

Test service account creation with empty/blank username validation

def test_service_account_validation_valid_username(self):
634    def test_service_account_validation_valid_username(self):
635        """Test service account creation with valid username"""
636        self.client.force_login(self.admin)
637
638        # Test with valid username
639        response = self.client.post(
640            reverse("authentik_api:user-service-account"),
641            data={
642                "name": "valid-service-account",
643                "create_group": True,
644            },
645        )
646        self.assertEqual(response.status_code, 200)
647
648        # Verify response structure
649        body = loads(response.content)
650        self.assertIn("username", body)
651        self.assertIn("user_uid", body)
652        self.assertIn("user_pk", body)
653        self.assertIn("group_pk", body)  # Should exist since create_group=True
654        self.assertIn("token", body)
655
656        # Verify field types
657        self.assertEqual(body["username"], "valid-service-account")
658        self.assertIsInstance(body["user_pk"], int)
659        self.assertIsInstance(body["user_uid"], str)
660        self.assertIsInstance(body["token"], str)
661        self.assertIsInstance(body["group_pk"], str)

Test service account creation with valid username

def test_service_account_validation_without_group(self):
663    def test_service_account_validation_without_group(self):
664        """Test service account creation without creating a group"""
665        self.client.force_login(self.admin)
666
667        response = self.client.post(
668            reverse("authentik_api:user-service-account"),
669            data={
670                "name": "no-group-service-account",
671                "create_group": False,
672            },
673        )
674        self.assertEqual(response.status_code, 200)
675
676        body = loads(response.content)
677        self.assertIn("username", body)
678        self.assertIn("user_uid", body)
679        self.assertIn("user_pk", body)
680        self.assertIn("token", body)
681        # Should NOT have group_pk when create_group=False
682        self.assertNotIn("group_pk", body)

Test service account creation without creating a group

def test_service_account_validation_duplicate_username(self):
684    def test_service_account_validation_duplicate_username(self):
685        """Test service account creation with duplicate username"""
686        self.client.force_login(self.admin)
687
688        # Create first service account
689        response = self.client.post(
690            reverse("authentik_api:user-service-account"),
691            data={
692                "name": "duplicate-test",
693                "create_group": True,
694            },
695        )
696        self.assertEqual(response.status_code, 200)
697
698        # Attempt to create second with same username
699        response = self.client.post(
700            reverse("authentik_api:user-service-account"),
701            data={
702                "name": "duplicate-test",
703                "create_group": True,
704            },
705        )
706        self.assertEqual(response.status_code, 400)
707        self.assertJSONEqual(
708            response.content,
709            {"name": ["This field must be unique."]},
710        )

Test service account creation with duplicate username

def test_service_account_validation_invalid_create_group(self):
712    def test_service_account_validation_invalid_create_group(self):
713        """Test service account creation with invalid create_group field"""
714        self.client.force_login(self.admin)
715
716        # Test with string instead of boolean
717        response = self.client.post(
718            reverse("authentik_api:user-service-account"),
719            data={
720                "name": "test-sa",
721                "create_group": "invalid",
722            },
723        )
724        self.assertEqual(response.status_code, 400)
725        self.assertJSONEqual(
726            response.content,
727            {"create_group": ["Must be a valid boolean."]},
728        )
729
730        # Test with number instead of boolean
731        response = self.client.post(
732            reverse("authentik_api:user-service-account"),
733            data={
734                "name": "test-sa",
735                "create_group": 123,
736            },
737        )
738        self.assertEqual(response.status_code, 400)
739        self.assertJSONEqual(
740            response.content,
741            {"create_group": ["Must be a valid boolean."]},
742        )

Test service account creation with invalid create_group field

def test_service_account_validation_invalid_expiring(self):
744    def test_service_account_validation_invalid_expiring(self):
745        """Test service account creation with invalid expiring field"""
746        self.client.force_login(self.admin)
747
748        # Test with string instead of boolean
749        response = self.client.post(
750            reverse("authentik_api:user-service-account"),
751            data={
752                "name": "test-sa",
753                "expiring": "invalid",
754            },
755        )
756        self.assertEqual(response.status_code, 400)
757        self.assertJSONEqual(
758            response.content,
759            {"expiring": ["Must be a valid boolean."]},
760        )

Test service account creation with invalid expiring field

def test_service_account_validation_invalid_expires(self):
762    def test_service_account_validation_invalid_expires(self):
763        """Test service account creation with invalid expires field"""
764        self.client.force_login(self.admin)
765
766        # Test with invalid datetime string
767        response = self.client.post(
768            reverse("authentik_api:user-service-account"),
769            data={
770                "name": "test-sa",
771                "expires": "invalid-datetime",
772            },
773        )
774        self.assertEqual(response.status_code, 400)
775        self.assertJSONEqual(
776            response.content,
777            {
778                "expires": [
779                    "Datetime has wrong format. Use one of these formats instead: "
780                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
781                ]
782            },
783        )
784
785        # Test with invalid format
786        response = self.client.post(
787            reverse("authentik_api:user-service-account"),
788            data={
789                "name": "test-sa",
790                "expires": "2024-13-45",  # Invalid month/day
791            },
792        )
793        self.assertEqual(response.status_code, 400)
794        self.assertJSONEqual(
795            response.content,
796            {
797                "expires": [
798                    "Datetime has wrong format. Use one of these formats instead: "
799                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
800                ]
801            },
802        )

Test service account creation with invalid expires field

def test_service_account_validation_multiple_errors(self):
804    def test_service_account_validation_multiple_errors(self):
805        """Test service account creation with multiple validation errors"""
806        self.client.force_login(self.admin)
807
808        response = self.client.post(
809            reverse("authentik_api:user-service-account"),
810            data={
811                "name": "",  # Empty username
812                "create_group": "invalid",  # Invalid boolean
813                "expiring": 123,  # Invalid boolean
814                "expires": "not-a-date",  # Invalid datetime
815            },
816        )
817        self.assertEqual(response.status_code, 400)
818        self.assertJSONEqual(
819            response.content,
820            {
821                "name": ["This field may not be blank."],
822                "create_group": ["Must be a valid boolean."],
823                "expiring": ["Must be a valid boolean."],
824                "expires": [
825                    "Datetime has wrong format. Use one of these formats instead: "
826                    "YYYY-MM-DDThh:mm[:ss[.uuuuuu]][+HH:MM|-HH:MM|Z]."
827                ],
828            },
829        )

Test service account creation with multiple validation errors

def test_service_account_validation_user_friendly_duplicate_error(self):
831    def test_service_account_validation_user_friendly_duplicate_error(self):
832        """Test that duplicate username returns user-friendly error, not database error"""
833        self.client.force_login(self.admin)
834
835        # Create first service account
836        response = self.client.post(
837            reverse("authentik_api:user-service-account"),
838            data={
839                "name": "duplicate-username-test",
840                "create_group": True,
841            },
842        )
843        self.assertEqual(response.status_code, 200)
844
845        # Attempt to create second with same username
846        response = self.client.post(
847            reverse("authentik_api:user-service-account"),
848            data={
849                "name": "duplicate-username-test",
850                "create_group": True,
851            },
852        )
853        self.assertEqual(response.status_code, 400)
854        self.assertJSONEqual(
855            response.content,
856            {"name": ["This field must be unique."]},
857        )

Test that duplicate username returns user-friendly error, not database error

def test_filter_last_login(self):
859    def test_filter_last_login(self):
860        """Test API filtering by last_login"""
861        from datetime import timedelta
862
863        from django.utils import timezone
864
865        User.objects.all().delete()
866        admin = create_test_admin_user()
867        self.client.force_login(admin)
868
869        # Create users with different last_login values
870        user_recent = create_test_user()
871        user_recent.last_login = timezone.now()
872        user_recent.save()
873
874        user_old = create_test_user()
875        user_old.last_login = timezone.now() - timedelta(days=400)  # Over 1 year ago
876        user_old.save()
877
878        user_never = create_test_user()
879        user_never.last_login = None  # Never logged in
880        user_never.save()
881
882        # Filter users who logged in before 1 year ago
883        one_year_ago = (timezone.now() - timedelta(days=365)).isoformat()
884        response = self.client.get(
885            reverse("authentik_api:user-list"),
886            data={"last_login__lt": one_year_ago},
887        )
888        self.assertEqual(response.status_code, 200)
889        body = loads(response.content)
890        self.assertEqual(len(body["results"]), 1)
891        self.assertEqual(body["results"][0]["pk"], user_old.pk)
892
893        # Filter users who have never logged in
894        response = self.client.get(
895            reverse("authentik_api:user-list"),
896            data={"last_login__isnull": True},
897        )
898        self.assertEqual(response.status_code, 200)
899        body = loads(response.content)
900        # Should include user_never and admin (who hasn't logged in via the app)
901        pks = [r["pk"] for r in body["results"]]
902        self.assertIn(user_never.pk, pks)

Test API filtering by last_login

def test_sort_by_last_login(self):
904    def test_sort_by_last_login(self):
905        """Test API sorting by last_login"""
906        from datetime import timedelta
907
908        from django.utils import timezone
909
910        User.objects.all().delete()
911        admin = create_test_admin_user()
912        self.client.force_login(admin)
913
914        user1 = create_test_user()
915        user1.last_login = timezone.now() - timedelta(days=10)
916        user1.save()
917
918        user2 = create_test_user()
919        user2.last_login = timezone.now() - timedelta(days=5)
920        user2.save()
921
922        # Ascending order (oldest first)
923        response = self.client.get(
924            reverse("authentik_api:user-list"),
925            data={"ordering": "last_login"},
926        )
927        self.assertEqual(response.status_code, 200)
928        body = loads(response.content)
929        # Users with null last_login come first, then user1 (older), then user2 (newer)
930        self.assertEqual(len(body["results"]), 3)
931
932        # Descending order (newest first)
933        response = self.client.get(
934            reverse("authentik_api:user-list"),
935            data={"ordering": "-last_login"},
936        )
937        self.assertEqual(response.status_code, 200)
938        body = loads(response.content)
939        # user2 should come before user1 (more recent login)
940        pks = [r["pk"] for r in body["results"]]
941        self.assertIn(user1.pk, pks)
942        self.assertIn(user2.pk, pks)
943        # Verify user2 comes before user1 in descending order
944        self.assertLess(pks.index(user2.pk), pks.index(user1.pk))

Test API sorting by last_login

class TestUsersAPIGroupRoleValidation(rest_framework.test.APITestCase):
 947class TestUsersAPIGroupRoleValidation(APITestCase):
 948    """Test that PATCH /api/v3/core/users/{pk}/ enforces group and role permission checks."""
 949
 950    def setUp(self) -> None:
 951        self.actor = create_test_user()
 952        self.target = create_test_user()
 953
 954    def _patch(self, data: dict):
 955        self.client.force_login(self.actor)
 956        return self.client.patch(
 957            reverse("authentik_api:user-detail", kwargs={"pk": self.target.pk}),
 958            data=data,
 959            content_type="application/json",
 960        )
 961
 962    def test_patch_superuser_group_no_perm(self):
 963        """Assigning a superuser group without enable_group_superuser must be rejected."""
 964        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 965        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 966        group = Group.objects.create(name=generate_id(), is_superuser=True)
 967        res = self._patch({"groups": [str(group.pk)]})
 968        self.assertEqual(res.status_code, 400)
 969
 970    def test_patch_superuser_group_with_perm(self):
 971        """Assigning a superuser group with enable_group_superuser must succeed."""
 972        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 973        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 974        self.actor.assign_perms_to_managed_role("authentik_core.enable_group_superuser")
 975        group = Group.objects.create(name=generate_id(), is_superuser=True)
 976        res = self._patch({"groups": [str(group.pk)]})
 977        self.assertEqual(res.status_code, 200)
 978
 979    def test_patch_non_superuser_group_no_perm(self):
 980        """Assigning a non-superuser group without special permission must succeed."""
 981        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 982        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 983        group = Group.objects.create(name=generate_id(), is_superuser=False)
 984        res = self._patch({"groups": [str(group.pk)]})
 985        self.assertEqual(res.status_code, 200)
 986
 987    def test_patch_existing_superuser_group_no_perm(self):
 988        """Keeping an existing superuser group membership without the permission must succeed."""
 989        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 990        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
 991        group = Group.objects.create(name=generate_id(), is_superuser=True)
 992        self.target.groups.add(group)
 993        res = self._patch({"groups": [str(group.pk)]})
 994        self.assertEqual(res.status_code, 200)
 995
 996    def test_patch_role_no_perm(self):
 997        """Assigning a new role without change_role must be rejected."""
 998        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 999        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1000        role = Role.objects.create(name=generate_id())
1001        res = self._patch({"roles": [str(role.pk)]})
1002        self.assertEqual(res.status_code, 400)
1003
1004    def test_patch_role_with_perm(self):
1005        """Assigning a new role with change_role must succeed."""
1006        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
1007        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1008        self.actor.assign_perms_to_managed_role("authentik_rbac.change_role")
1009        role = Role.objects.create(name=generate_id())
1010        res = self._patch({"roles": [str(role.pk)]})
1011        self.assertEqual(res.status_code, 200)
1012
1013    def test_patch_existing_role_no_perm(self):
1014        """Keeping an existing role without change_role must succeed."""
1015        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
1016        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1017        role = Role.objects.create(name=generate_id())
1018        self.target.roles.add(role)
1019        res = self._patch({"roles": [str(role.pk)]})
1020        self.assertEqual(res.status_code, 200)

Test that PATCH /api/v3/core/users/{pk}/ enforces group and role permission checks.

def setUp(self) -> None:
950    def setUp(self) -> None:
951        self.actor = create_test_user()
952        self.target = create_test_user()

Hook method for setting up the test fixture before exercising it.

def test_patch_superuser_group_no_perm(self):
962    def test_patch_superuser_group_no_perm(self):
963        """Assigning a superuser group without enable_group_superuser must be rejected."""
964        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
965        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
966        group = Group.objects.create(name=generate_id(), is_superuser=True)
967        res = self._patch({"groups": [str(group.pk)]})
968        self.assertEqual(res.status_code, 400)

Assigning a superuser group without enable_group_superuser must be rejected.

def test_patch_superuser_group_with_perm(self):
970    def test_patch_superuser_group_with_perm(self):
971        """Assigning a superuser group with enable_group_superuser must succeed."""
972        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
973        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
974        self.actor.assign_perms_to_managed_role("authentik_core.enable_group_superuser")
975        group = Group.objects.create(name=generate_id(), is_superuser=True)
976        res = self._patch({"groups": [str(group.pk)]})
977        self.assertEqual(res.status_code, 200)

Assigning a superuser group with enable_group_superuser must succeed.

def test_patch_non_superuser_group_no_perm(self):
979    def test_patch_non_superuser_group_no_perm(self):
980        """Assigning a non-superuser group without special permission must succeed."""
981        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
982        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
983        group = Group.objects.create(name=generate_id(), is_superuser=False)
984        res = self._patch({"groups": [str(group.pk)]})
985        self.assertEqual(res.status_code, 200)

Assigning a non-superuser group without special permission must succeed.

def test_patch_existing_superuser_group_no_perm(self):
987    def test_patch_existing_superuser_group_no_perm(self):
988        """Keeping an existing superuser group membership without the permission must succeed."""
989        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
990        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
991        group = Group.objects.create(name=generate_id(), is_superuser=True)
992        self.target.groups.add(group)
993        res = self._patch({"groups": [str(group.pk)]})
994        self.assertEqual(res.status_code, 200)

Keeping an existing superuser group membership without the permission must succeed.

def test_patch_role_no_perm(self):
 996    def test_patch_role_no_perm(self):
 997        """Assigning a new role without change_role must be rejected."""
 998        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
 999        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1000        role = Role.objects.create(name=generate_id())
1001        res = self._patch({"roles": [str(role.pk)]})
1002        self.assertEqual(res.status_code, 400)

Assigning a new role without change_role must be rejected.

def test_patch_role_with_perm(self):
1004    def test_patch_role_with_perm(self):
1005        """Assigning a new role with change_role must succeed."""
1006        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
1007        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1008        self.actor.assign_perms_to_managed_role("authentik_rbac.change_role")
1009        role = Role.objects.create(name=generate_id())
1010        res = self._patch({"roles": [str(role.pk)]})
1011        self.assertEqual(res.status_code, 200)

Assigning a new role with change_role must succeed.

def test_patch_existing_role_no_perm(self):
1013    def test_patch_existing_role_no_perm(self):
1014        """Keeping an existing role without change_role must succeed."""
1015        self.actor.assign_perms_to_managed_role("authentik_core.view_user")
1016        self.actor.assign_perms_to_managed_role("authentik_core.change_user", self.target)
1017        role = Role.objects.create(name=generate_id())
1018        self.target.roles.add(role)
1019        res = self._patch({"roles": [str(role.pk)]})
1020        self.assertEqual(res.status_code, 200)

Keeping an existing role without change_role must succeed.