authentik.tenants.tests.utils
1from django.core.management import call_command 2from django.db import connection, connections 3from django_tenants.utils import get_public_schema_name, get_tenant_base_schema, schema_context 4from rest_framework.test import APITransactionTestCase 5 6from authentik.tenants.models import Tenant 7 8 9class TenantAPITestCase(APITransactionTestCase): 10 # Overridden to also remove additional schemas we may have created 11 def _fixture_teardown(self): 12 for db_name in self._databases_names(include_mirrors=False): 13 connections[db_name].set_schema_to_public() 14 with connections[db_name].cursor() as cursor: 15 cursor.execute("SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname ~ 't_.*'") 16 schemas = cursor.fetchall() 17 for row in schemas: 18 schema = row[0] 19 cursor.execute(f"DROP SCHEMA {schema} CASCADE") 20 super()._fixture_teardown() 21 22 def setUp(self): 23 with schema_context(get_public_schema_name()): 24 Tenant.objects.update_or_create( 25 defaults={"name": "Template", "ready": False}, 26 schema_name=get_tenant_base_schema(), 27 ) 28 call_command("migrate_schemas", schema=get_tenant_base_schema(), tenant=True) 29 30 def assertSchemaExists(self, schema_name): 31 with connection.cursor() as cursor: 32 cursor.execute( 33 "SELECT * FROM information_schema.schemata WHERE schema_name = %(schema_name)s", # nosec 34 {"schema_name": schema_name}, 35 ) 36 self.assertEqual(cursor.rowcount, 1) 37 38 cursor.execute( 39 "SELECT * FROM information_schema.tables WHERE table_schema = %(schema_name)s", 40 {"schema_name": get_tenant_base_schema()}, 41 ) 42 expected_tables = cursor.rowcount 43 cursor.execute( 44 "SELECT * FROM information_schema.tables WHERE table_schema = %(schema_name)s", # nosec 45 {"schema_name": schema_name}, 46 ) 47 self.assertEqual(cursor.rowcount, expected_tables) 48 49 def assertSchemaDoesntExist(self, schema_name): 50 with connection.cursor() as cursor: 51 cursor.execute( 52 "SELECT * FROM information_schema.schemata WHERE schema_name = %(schema_name)s", # nosec 53 {"schema_name": schema_name}, 54 ) 55 self.assertEqual(cursor.rowcount, 0)
10class TenantAPITestCase(APITransactionTestCase): 11 # Overridden to also remove additional schemas we may have created 12 def _fixture_teardown(self): 13 for db_name in self._databases_names(include_mirrors=False): 14 connections[db_name].set_schema_to_public() 15 with connections[db_name].cursor() as cursor: 16 cursor.execute("SELECT nspname FROM pg_catalog.pg_namespace WHERE nspname ~ 't_.*'") 17 schemas = cursor.fetchall() 18 for row in schemas: 19 schema = row[0] 20 cursor.execute(f"DROP SCHEMA {schema} CASCADE") 21 super()._fixture_teardown() 22 23 def setUp(self): 24 with schema_context(get_public_schema_name()): 25 Tenant.objects.update_or_create( 26 defaults={"name": "Template", "ready": False}, 27 schema_name=get_tenant_base_schema(), 28 ) 29 call_command("migrate_schemas", schema=get_tenant_base_schema(), tenant=True) 30 31 def assertSchemaExists(self, schema_name): 32 with connection.cursor() as cursor: 33 cursor.execute( 34 "SELECT * FROM information_schema.schemata WHERE schema_name = %(schema_name)s", # nosec 35 {"schema_name": schema_name}, 36 ) 37 self.assertEqual(cursor.rowcount, 1) 38 39 cursor.execute( 40 "SELECT * FROM information_schema.tables WHERE table_schema = %(schema_name)s", 41 {"schema_name": get_tenant_base_schema()}, 42 ) 43 expected_tables = cursor.rowcount 44 cursor.execute( 45 "SELECT * FROM information_schema.tables WHERE table_schema = %(schema_name)s", # nosec 46 {"schema_name": schema_name}, 47 ) 48 self.assertEqual(cursor.rowcount, expected_tables) 49 50 def assertSchemaDoesntExist(self, schema_name): 51 with connection.cursor() as cursor: 52 cursor.execute( 53 "SELECT * FROM information_schema.schemata WHERE schema_name = %(schema_name)s", # nosec 54 {"schema_name": schema_name}, 55 ) 56 self.assertEqual(cursor.rowcount, 0)
A class whose instances are single test cases.
By default, the test code itself should be placed in a method named 'runTest'.
If the fixture may be used for many test cases, create as many test methods as are needed. When instantiating such a TestCase subclass, specify in the constructor arguments the name of the test method that the instance is to execute.
Test authors should subclass TestCase for their own tests. Construction and deconstruction of the test's environment ('fixture') can be implemented by overriding the 'setUp' and 'tearDown' methods respectively.
If it is necessary to override the __init__ method, the base class __init__ method must always be called. It is important that subclasses should not change the signature of their __init__ method, since instances of the classes are instantiated automatically by parts of the framework in order to be run.
When subclassing TestCase, you can set these attributes:
- failureException: determines which exception will be raised when the instance's assertion methods fail; test methods raising this exception will be deemed to have 'failed' rather than 'errored'.
- longMessage: determines whether long messages (including repr of objects used in assert methods) will be printed on failure in addition to any explicit message passed.
- maxDiff: sets the maximum length of a diff in failure messages by assert methods using difflib. It is looked up as an instance attribute so can be configured by individual tests if required.
23 def setUp(self): 24 with schema_context(get_public_schema_name()): 25 Tenant.objects.update_or_create( 26 defaults={"name": "Template", "ready": False}, 27 schema_name=get_tenant_base_schema(), 28 ) 29 call_command("migrate_schemas", schema=get_tenant_base_schema(), tenant=True)
Hook method for setting up the test fixture before exercising it.
31 def assertSchemaExists(self, schema_name): 32 with connection.cursor() as cursor: 33 cursor.execute( 34 "SELECT * FROM information_schema.schemata WHERE schema_name = %(schema_name)s", # nosec 35 {"schema_name": schema_name}, 36 ) 37 self.assertEqual(cursor.rowcount, 1) 38 39 cursor.execute( 40 "SELECT * FROM information_schema.tables WHERE table_schema = %(schema_name)s", 41 {"schema_name": get_tenant_base_schema()}, 42 ) 43 expected_tables = cursor.rowcount 44 cursor.execute( 45 "SELECT * FROM information_schema.tables WHERE table_schema = %(schema_name)s", # nosec 46 {"schema_name": schema_name}, 47 ) 48 self.assertEqual(cursor.rowcount, expected_tables)