authentik.enterprise.endpoints.connectors.agent.http

  1from base64 import urlsafe_b64encode
  2from json import dumps
  3from secrets import token_bytes
  4
  5from cryptography.hazmat.backends import default_backend
  6from cryptography.hazmat.primitives import hashes, serialization
  7from cryptography.hazmat.primitives.asymmetric import ec
  8from cryptography.hazmat.primitives.ciphers import Cipher
  9from cryptography.hazmat.primitives.ciphers.algorithms import AES
 10from cryptography.hazmat.primitives.ciphers.modes import GCM
 11from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
 12from django.http import HttpResponse
 13from jwcrypto.common import base64url_decode, base64url_encode
 14
 15from authentik.endpoints.connectors.agent.models import AgentDeviceConnection
 16
 17
 18def length_prefixed(data: bytes) -> bytes:
 19    length = len(data)
 20    return length.to_bytes(4, "big") + data
 21
 22
 23def build_apu(public_key: ec.EllipticCurvePublicKey):
 24    # X9.63 representation: 0x04 || X || Y
 25    public_numbers = public_key.public_numbers()
 26
 27    x_bytes = public_numbers.x.to_bytes(32, "big")
 28    y_bytes = public_numbers.y.to_bytes(32, "big")
 29
 30    x963 = bytes([0x04]) + x_bytes + y_bytes
 31
 32    result = length_prefixed(b"APPLE") + length_prefixed(x963)
 33
 34    return result
 35
 36
 37def encrypt_token_with_a256_gcm(body: dict, device_encryption_key: str, apv: bytes) -> str:
 38    ephemeral_key = ec.generate_private_key(curve=ec.SECP256R1())
 39    device_public_key = serialization.load_pem_public_key(
 40        device_encryption_key.encode(), backend=default_backend()
 41    )
 42
 43    shared_secret_z = ephemeral_key.exchange(ec.ECDH(), device_public_key)
 44
 45    apu = build_apu(ephemeral_key.public_key())
 46
 47    jwe_header = {
 48        "enc": "A256GCM",
 49        "kid": "ephemeralKey",
 50        "epk": {
 51            "x": base64url_encode(
 52                ephemeral_key.public_key().public_numbers().x.to_bytes(32, "big")
 53            ),
 54            "y": base64url_encode(
 55                ephemeral_key.public_key().public_numbers().y.to_bytes(32, "big")
 56            ),
 57            "kty": "EC",
 58            "crv": "P-256",
 59        },
 60        "typ": "platformsso-login-response+jwt",
 61        "alg": "ECDH-ES",
 62        "apu": base64url_encode(apu),
 63        "apv": base64url_encode(apv),
 64    }
 65
 66    party_u_info = length_prefixed(apu)
 67    party_v_info = length_prefixed(apv)
 68    supp_pub_info = (256).to_bytes(4, "big")
 69
 70    other_info = length_prefixed(b"A256GCM") + party_u_info + party_v_info + supp_pub_info
 71
 72    ckdf = ConcatKDFHash(
 73        algorithm=hashes.SHA256(),
 74        length=32,
 75        otherinfo=other_info,
 76    )
 77
 78    derived_key = ckdf.derive(shared_secret_z)
 79
 80    nonce = token_bytes(96)
 81
 82    header_json = dumps(jwe_header, separators=(",", ":")).encode()
 83    aad = urlsafe_b64encode(header_json).rstrip(b"=")
 84
 85    cipher = Cipher(AES(derived_key), GCM(nonce))
 86    encryptor = cipher.encryptor()
 87    encryptor.authenticate_additional_data(aad)
 88    ciphertext = encryptor.update(dumps(body).encode()) + encryptor.finalize()
 89
 90    # base64url encoding
 91    protected_b64 = urlsafe_b64encode(header_json).rstrip(b"=")
 92    iv_b64 = urlsafe_b64encode(nonce).rstrip(b"=")
 93    ciphertext_b64 = urlsafe_b64encode(ciphertext).rstrip(b"=")
 94    tag_b64 = urlsafe_b64encode(encryptor.tag).rstrip(b"=")
 95
 96    jwe_compact = b".".join(
 97        [
 98            protected_b64,
 99            b"",
100            iv_b64,
101            ciphertext_b64,
102            tag_b64,
103        ]
104    )
105    return jwe_compact.decode()
106
107
108class JWEResponse(HttpResponse):
109
110    def __init__(
111        self,
112        data: dict,
113        device: AgentDeviceConnection,
114        apv: str,
115    ):
116        super().__init__(
117            content=encrypt_token_with_a256_gcm(
118                data, device.apple_encryption_key, base64url_decode(apv)
119            ),
120            content_type="application/platformsso-login-response+jwt",
121        )
def length_prefixed(data: bytes) -> bytes:
19def length_prefixed(data: bytes) -> bytes:
20    length = len(data)
21    return length.to_bytes(4, "big") + data
def build_apu( public_key: cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey):
24def build_apu(public_key: ec.EllipticCurvePublicKey):
25    # X9.63 representation: 0x04 || X || Y
26    public_numbers = public_key.public_numbers()
27
28    x_bytes = public_numbers.x.to_bytes(32, "big")
29    y_bytes = public_numbers.y.to_bytes(32, "big")
30
31    x963 = bytes([0x04]) + x_bytes + y_bytes
32
33    result = length_prefixed(b"APPLE") + length_prefixed(x963)
34
35    return result
def encrypt_token_with_a256_gcm(body: dict, device_encryption_key: str, apv: bytes) -> str:
 38def encrypt_token_with_a256_gcm(body: dict, device_encryption_key: str, apv: bytes) -> str:
 39    ephemeral_key = ec.generate_private_key(curve=ec.SECP256R1())
 40    device_public_key = serialization.load_pem_public_key(
 41        device_encryption_key.encode(), backend=default_backend()
 42    )
 43
 44    shared_secret_z = ephemeral_key.exchange(ec.ECDH(), device_public_key)
 45
 46    apu = build_apu(ephemeral_key.public_key())
 47
 48    jwe_header = {
 49        "enc": "A256GCM",
 50        "kid": "ephemeralKey",
 51        "epk": {
 52            "x": base64url_encode(
 53                ephemeral_key.public_key().public_numbers().x.to_bytes(32, "big")
 54            ),
 55            "y": base64url_encode(
 56                ephemeral_key.public_key().public_numbers().y.to_bytes(32, "big")
 57            ),
 58            "kty": "EC",
 59            "crv": "P-256",
 60        },
 61        "typ": "platformsso-login-response+jwt",
 62        "alg": "ECDH-ES",
 63        "apu": base64url_encode(apu),
 64        "apv": base64url_encode(apv),
 65    }
 66
 67    party_u_info = length_prefixed(apu)
 68    party_v_info = length_prefixed(apv)
 69    supp_pub_info = (256).to_bytes(4, "big")
 70
 71    other_info = length_prefixed(b"A256GCM") + party_u_info + party_v_info + supp_pub_info
 72
 73    ckdf = ConcatKDFHash(
 74        algorithm=hashes.SHA256(),
 75        length=32,
 76        otherinfo=other_info,
 77    )
 78
 79    derived_key = ckdf.derive(shared_secret_z)
 80
 81    nonce = token_bytes(96)
 82
 83    header_json = dumps(jwe_header, separators=(",", ":")).encode()
 84    aad = urlsafe_b64encode(header_json).rstrip(b"=")
 85
 86    cipher = Cipher(AES(derived_key), GCM(nonce))
 87    encryptor = cipher.encryptor()
 88    encryptor.authenticate_additional_data(aad)
 89    ciphertext = encryptor.update(dumps(body).encode()) + encryptor.finalize()
 90
 91    # base64url encoding
 92    protected_b64 = urlsafe_b64encode(header_json).rstrip(b"=")
 93    iv_b64 = urlsafe_b64encode(nonce).rstrip(b"=")
 94    ciphertext_b64 = urlsafe_b64encode(ciphertext).rstrip(b"=")
 95    tag_b64 = urlsafe_b64encode(encryptor.tag).rstrip(b"=")
 96
 97    jwe_compact = b".".join(
 98        [
 99            protected_b64,
100            b"",
101            iv_b64,
102            ciphertext_b64,
103            tag_b64,
104        ]
105    )
106    return jwe_compact.decode()
class JWEResponse(django.http.response.HttpResponse):
109class JWEResponse(HttpResponse):
110
111    def __init__(
112        self,
113        data: dict,
114        device: AgentDeviceConnection,
115        apv: str,
116    ):
117        super().__init__(
118            content=encrypt_token_with_a256_gcm(
119                data, device.apple_encryption_key, base64url_decode(apv)
120            ),
121            content_type="application/platformsso-login-response+jwt",
122        )

An HTTP response class with a string as content.

This content can be read, appended to, or replaced.

JWEResponse( data: dict, device: authentik.endpoints.connectors.agent.models.AgentDeviceConnection, apv: str)
111    def __init__(
112        self,
113        data: dict,
114        device: AgentDeviceConnection,
115        apv: str,
116    ):
117        super().__init__(
118            content=encrypt_token_with_a256_gcm(
119                data, device.apple_encryption_key, base64url_decode(apv)
120            ),
121            content_type="application/platformsso-login-response+jwt",
122        )