authentik.stages.authenticator_webauthn.tasks

MDS Helpers

 1"""MDS Helpers"""
 2
 3from functools import lru_cache
 4from json import loads
 5from pathlib import Path
 6
 7from django.core.cache import cache
 8from django.db.transaction import atomic
 9from django.utils.translation import gettext_lazy as _
10from dramatiq.actor import actor
11from fido2.mds3 import filter_revoked, parse_blob
12
13from authentik.stages.authenticator_webauthn.models import (
14    UNKNOWN_DEVICE_TYPE_AAGUID,
15    WebAuthnDeviceType,
16)
17from authentik.tasks.middleware import CurrentTask
18
19CACHE_KEY_MDS_NO = "goauthentik.io/stages/authenticator_webauthn/mds_no"
20AAGUID_BLOB_PATH = Path(__file__).parent / "mds" / "aaguid.json"
21MDS_BLOB_PATH = Path(__file__).parent / "mds" / "blob.jwt"
22MDS_CA_PATH = Path(__file__).parent / "mds" / "root-r3.crt"
23
24
25@lru_cache
26def mds_ca() -> bytes:
27    """Cache MDS Signature CA, GlobalSign Root CA - R3"""
28    with open(MDS_CA_PATH, mode="rb") as _raw_root:
29        return _raw_root.read()
30
31
32@actor(description=_("Background task to import FIDO Alliance MDS blob and AAGUIDs into database."))
33def webauthn_mds_import(force=False):
34    """Background task to import FIDO Alliance MDS blob and AAGUIDs into database"""
35    self = CurrentTask.get_task()
36    with open(MDS_BLOB_PATH, mode="rb") as _raw_blob:
37        blob = parse_blob(_raw_blob.read(), mds_ca())
38    to_create_update = [
39        WebAuthnDeviceType(
40            aaguid=UNKNOWN_DEVICE_TYPE_AAGUID,
41            description="authentik: Unknown devices",
42        )
43    ]
44    to_delete = []
45
46    mds_no = cache.get(CACHE_KEY_MDS_NO)
47    if mds_no != blob.no or force:
48        for entry in blob.entries:
49            aaguid = entry.aaguid
50            if not aaguid:
51                continue
52            if not filter_revoked(entry):
53                to_delete.append(str(aaguid))
54                continue
55            metadata = entry.metadata_statement
56            to_create_update.append(
57                WebAuthnDeviceType(
58                    aaguid=str(aaguid),
59                    description=metadata.description,
60                    icon=metadata.icon,
61                )
62            )
63    with atomic():
64        WebAuthnDeviceType.objects.bulk_create(
65            to_create_update,
66            update_conflicts=True,
67            update_fields=["description", "icon"],
68            unique_fields=["aaguid"],
69        )
70        WebAuthnDeviceType.objects.filter(aaguid__in=to_delete).delete()
71    if mds_no != blob.no:
72        cache.set(CACHE_KEY_MDS_NO, blob.no)
73
74    with open(AAGUID_BLOB_PATH, mode="rb") as _raw_blob:
75        entries = loads(_raw_blob.read())
76    to_create_update = [
77        WebAuthnDeviceType(
78            aaguid=str(aaguid), description=details.get("name"), icon=details.get("icon_light")
79        )
80        for aaguid, details in entries.items()
81    ]
82    with atomic():
83        WebAuthnDeviceType.objects.bulk_create(
84            to_create_update,
85            update_conflicts=True,
86            update_fields=["description", "icon"],
87            unique_fields=["aaguid"],
88        )
89
90    self.info("Successfully imported FIDO Alliance MDS blobs and AAGUIDs.")
CACHE_KEY_MDS_NO = 'goauthentik.io/stages/authenticator_webauthn/mds_no'
AAGUID_BLOB_PATH = PosixPath('/home/runner/work/authentik/authentik/authentik/stages/authenticator_webauthn/mds/aaguid.json')
MDS_BLOB_PATH = PosixPath('/home/runner/work/authentik/authentik/authentik/stages/authenticator_webauthn/mds/blob.jwt')
MDS_CA_PATH = PosixPath('/home/runner/work/authentik/authentik/authentik/stages/authenticator_webauthn/mds/root-r3.crt')
@lru_cache
def mds_ca() -> bytes:
26@lru_cache
27def mds_ca() -> bytes:
28    """Cache MDS Signature CA, GlobalSign Root CA - R3"""
29    with open(MDS_CA_PATH, mode="rb") as _raw_root:
30        return _raw_root.read()

Cache MDS Signature CA, GlobalSign Root CA - R3

webauthn_mds_import = Actor(<function webauthn_mds_import>, queue_name='default', actor_name='webauthn_mds_import')

Background task to import FIDO Alliance MDS blob and AAGUIDs into database