authentik.sources.plex.migrations.0006_migrate_groupplexsourceconnection

 1from django.db import migrations
 2
 3
 4def create_missing_groupplexsourceconnection(apps, schema_editor):
 5    db_alias = schema_editor.connection.alias
 6
 7    GroupSourceConnection = apps.get_model("authentik_core", "GroupSourceConnection")
 8    PlexSource = apps.get_model("authentik_sources_plex", "PlexSource")
 9    GroupPlexSourceConnection = apps.get_model(
10        "authentik_sources_plex", "GroupPlexSourceConnection"
11    )
12
13    for source in PlexSource.objects.using(db_alias).all():
14        for gsc in GroupSourceConnection.objects.using(db_alias).filter(source=source):
15            if GroupPlexSourceConnection.objects.using(db_alias).filter(pk=gsc.pk).exists():
16                continue
17            gpsc = GroupPlexSourceConnection(pk=gsc.pk)
18            gpsc.save(using=db_alias)
19
20
21class Migration(migrations.Migration):
22
23    dependencies = [
24        (
25            "authentik_sources_plex",
26            "0005_migrate_userplexsourceconnection_identifier",
27        ),
28        ("authentik_core", "0044_usersourceconnection_new_identifier"),
29    ]
30
31    operations = [
32        migrations.RunPython(
33            code=create_missing_groupplexsourceconnection, reverse_code=migrations.RunPython.noop
34        ),
35    ]
def create_missing_groupplexsourceconnection(apps, schema_editor):
 5def create_missing_groupplexsourceconnection(apps, schema_editor):
 6    db_alias = schema_editor.connection.alias
 7
 8    GroupSourceConnection = apps.get_model("authentik_core", "GroupSourceConnection")
 9    PlexSource = apps.get_model("authentik_sources_plex", "PlexSource")
10    GroupPlexSourceConnection = apps.get_model(
11        "authentik_sources_plex", "GroupPlexSourceConnection"
12    )
13
14    for source in PlexSource.objects.using(db_alias).all():
15        for gsc in GroupSourceConnection.objects.using(db_alias).filter(source=source):
16            if GroupPlexSourceConnection.objects.using(db_alias).filter(pk=gsc.pk).exists():
17                continue
18            gpsc = GroupPlexSourceConnection(pk=gsc.pk)
19            gpsc.save(using=db_alias)
class Migration(django.db.migrations.migration.Migration):
22class Migration(migrations.Migration):
23
24    dependencies = [
25        (
26            "authentik_sources_plex",
27            "0005_migrate_userplexsourceconnection_identifier",
28        ),
29        ("authentik_core", "0044_usersourceconnection_new_identifier"),
30    ]
31
32    operations = [
33        migrations.RunPython(
34            code=create_missing_groupplexsourceconnection, reverse_code=migrations.RunPython.noop
35        ),
36    ]

The base class for all migrations.

Migration files will import this from django.db.migrations.Migration and subclass it as a class called Migration. It will have one or more of the following attributes:

  • operations: A list of Operation instances, probably from django.db.migrations.operations
  • dependencies: A list of tuples of (app_path, migration_name)
  • run_before: A list of tuples of (app_path, migration_name)
  • replaces: A list of migration_names

Note that all migrations come out of migrations and into the Loader or Graph as instances, having been initialized with their app label and name.

dependencies = [('authentik_sources_plex', '0005_migrate_userplexsourceconnection_identifier'), ('authentik_core', '0044_usersourceconnection_new_identifier')]
operations = [<RunPython code=<function create_missing_groupplexsourceconnection>, reverse_code=<function RunPython.noop>>]