authentik.stages.authenticator_webauthn.tasks
MDS Helpers
1"""MDS Helpers""" 2 3from functools import lru_cache 4from json import loads 5from pathlib import Path 6 7from django.core.cache import cache 8from django.db.transaction import atomic 9from django.utils.translation import gettext_lazy as _ 10from dramatiq.actor import actor 11from fido2.mds3 import filter_revoked, parse_blob 12 13from authentik.stages.authenticator_webauthn.models import ( 14 UNKNOWN_DEVICE_TYPE_AAGUID, 15 WebAuthnDeviceType, 16) 17from authentik.tasks.middleware import CurrentTask 18 19CACHE_KEY_MDS_NO = "goauthentik.io/stages/authenticator_webauthn/mds_no" 20AAGUID_BLOB_PATH = Path(__file__).parent / "mds" / "aaguid.json" 21MDS_BLOB_PATH = Path(__file__).parent / "mds" / "blob.jwt" 22MDS_CA_PATH = Path(__file__).parent / "mds" / "root-r3.crt" 23 24 25@lru_cache 26def mds_ca() -> bytes: 27 """Cache MDS Signature CA, GlobalSign Root CA - R3""" 28 with open(MDS_CA_PATH, mode="rb") as _raw_root: 29 return _raw_root.read() 30 31 32@actor(description=_("Background task to import FIDO Alliance MDS blob and AAGUIDs into database.")) 33def webauthn_mds_import(force=False): 34 """Background task to import FIDO Alliance MDS blob and AAGUIDs into database""" 35 self = CurrentTask.get_task() 36 with open(MDS_BLOB_PATH, mode="rb") as _raw_blob: 37 blob = parse_blob(_raw_blob.read(), mds_ca()) 38 to_create_update = [ 39 WebAuthnDeviceType( 40 aaguid=UNKNOWN_DEVICE_TYPE_AAGUID, 41 description="authentik: Unknown devices", 42 ) 43 ] 44 to_delete = [] 45 46 mds_no = cache.get(CACHE_KEY_MDS_NO) 47 if mds_no != blob.no or force: 48 for entry in blob.entries: 49 aaguid = entry.aaguid 50 if not aaguid: 51 continue 52 if not filter_revoked(entry): 53 to_delete.append(str(aaguid)) 54 continue 55 metadata = entry.metadata_statement 56 to_create_update.append( 57 WebAuthnDeviceType( 58 aaguid=str(aaguid), 59 description=metadata.description, 60 icon=metadata.icon, 61 ) 62 ) 63 with atomic(): 64 WebAuthnDeviceType.objects.bulk_create( 65 to_create_update, 66 update_conflicts=True, 67 update_fields=["description", "icon"], 68 unique_fields=["aaguid"], 69 ) 70 WebAuthnDeviceType.objects.filter(aaguid__in=to_delete).delete() 71 if mds_no != blob.no: 72 cache.set(CACHE_KEY_MDS_NO, blob.no) 73 74 with open(AAGUID_BLOB_PATH, mode="rb") as _raw_blob: 75 entries = loads(_raw_blob.read()) 76 to_create_update = [ 77 WebAuthnDeviceType( 78 aaguid=str(aaguid), description=details.get("name"), icon=details.get("icon_light") 79 ) 80 for aaguid, details in entries.items() 81 ] 82 with atomic(): 83 WebAuthnDeviceType.objects.bulk_create( 84 to_create_update, 85 update_conflicts=True, 86 update_fields=["description", "icon"], 87 unique_fields=["aaguid"], 88 ) 89 90 self.info("Successfully imported FIDO Alliance MDS blobs and AAGUIDs.")
CACHE_KEY_MDS_NO =
'goauthentik.io/stages/authenticator_webauthn/mds_no'
AAGUID_BLOB_PATH =
PosixPath('/home/runner/work/authentik/authentik/authentik/stages/authenticator_webauthn/mds/aaguid.json')
MDS_BLOB_PATH =
PosixPath('/home/runner/work/authentik/authentik/authentik/stages/authenticator_webauthn/mds/blob.jwt')
MDS_CA_PATH =
PosixPath('/home/runner/work/authentik/authentik/authentik/stages/authenticator_webauthn/mds/root-r3.crt')
@lru_cache
def
mds_ca() -> bytes:
26@lru_cache 27def mds_ca() -> bytes: 28 """Cache MDS Signature CA, GlobalSign Root CA - R3""" 29 with open(MDS_CA_PATH, mode="rb") as _raw_root: 30 return _raw_root.read()
Cache MDS Signature CA, GlobalSign Root CA - R3
webauthn_mds_import =
Actor(<function webauthn_mds_import>, queue_name='default', actor_name='webauthn_mds_import')
Background task to import FIDO Alliance MDS blob and AAGUIDs into database