authentik.enterprise.endpoints.connectors.agent.http
1from base64 import urlsafe_b64encode 2from json import dumps 3from secrets import token_bytes 4 5from cryptography.hazmat.backends import default_backend 6from cryptography.hazmat.primitives import hashes, serialization 7from cryptography.hazmat.primitives.asymmetric import ec 8from cryptography.hazmat.primitives.ciphers import Cipher 9from cryptography.hazmat.primitives.ciphers.algorithms import AES 10from cryptography.hazmat.primitives.ciphers.modes import GCM 11from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash 12from django.http import HttpResponse 13from jwcrypto.common import base64url_decode, base64url_encode 14 15from authentik.endpoints.connectors.agent.models import AgentDeviceConnection 16 17 18def length_prefixed(data: bytes) -> bytes: 19 length = len(data) 20 return length.to_bytes(4, "big") + data 21 22 23def build_apu(public_key: ec.EllipticCurvePublicKey): 24 # X9.63 representation: 0x04 || X || Y 25 public_numbers = public_key.public_numbers() 26 27 x_bytes = public_numbers.x.to_bytes(32, "big") 28 y_bytes = public_numbers.y.to_bytes(32, "big") 29 30 x963 = bytes([0x04]) + x_bytes + y_bytes 31 32 result = length_prefixed(b"APPLE") + length_prefixed(x963) 33 34 return result 35 36 37def encrypt_token_with_a256_gcm(body: dict, device_encryption_key: str, apv: bytes) -> str: 38 ephemeral_key = ec.generate_private_key(curve=ec.SECP256R1()) 39 device_public_key = serialization.load_pem_public_key( 40 device_encryption_key.encode(), backend=default_backend() 41 ) 42 43 shared_secret_z = ephemeral_key.exchange(ec.ECDH(), device_public_key) 44 45 apu = build_apu(ephemeral_key.public_key()) 46 47 jwe_header = { 48 "enc": "A256GCM", 49 "kid": "ephemeralKey", 50 "epk": { 51 "x": base64url_encode( 52 ephemeral_key.public_key().public_numbers().x.to_bytes(32, "big") 53 ), 54 "y": base64url_encode( 55 ephemeral_key.public_key().public_numbers().y.to_bytes(32, "big") 56 ), 57 "kty": "EC", 58 "crv": "P-256", 59 }, 60 "typ": "platformsso-login-response+jwt", 61 "alg": "ECDH-ES", 62 "apu": base64url_encode(apu), 63 "apv": base64url_encode(apv), 64 } 65 66 party_u_info = length_prefixed(apu) 67 party_v_info = length_prefixed(apv) 68 supp_pub_info = (256).to_bytes(4, "big") 69 70 other_info = length_prefixed(b"A256GCM") + party_u_info + party_v_info + supp_pub_info 71 72 ckdf = ConcatKDFHash( 73 algorithm=hashes.SHA256(), 74 length=32, 75 otherinfo=other_info, 76 ) 77 78 derived_key = ckdf.derive(shared_secret_z) 79 80 nonce = token_bytes(96) 81 82 header_json = dumps(jwe_header, separators=(",", ":")).encode() 83 aad = urlsafe_b64encode(header_json).rstrip(b"=") 84 85 cipher = Cipher(AES(derived_key), GCM(nonce)) 86 encryptor = cipher.encryptor() 87 encryptor.authenticate_additional_data(aad) 88 ciphertext = encryptor.update(dumps(body).encode()) + encryptor.finalize() 89 90 # base64url encoding 91 protected_b64 = urlsafe_b64encode(header_json).rstrip(b"=") 92 iv_b64 = urlsafe_b64encode(nonce).rstrip(b"=") 93 ciphertext_b64 = urlsafe_b64encode(ciphertext).rstrip(b"=") 94 tag_b64 = urlsafe_b64encode(encryptor.tag).rstrip(b"=") 95 96 jwe_compact = b".".join( 97 [ 98 protected_b64, 99 b"", 100 iv_b64, 101 ciphertext_b64, 102 tag_b64, 103 ] 104 ) 105 return jwe_compact.decode() 106 107 108class JWEResponse(HttpResponse): 109 110 def __init__( 111 self, 112 data: dict, 113 device: AgentDeviceConnection, 114 apv: str, 115 ): 116 super().__init__( 117 content=encrypt_token_with_a256_gcm( 118 data, device.apple_encryption_key, base64url_decode(apv) 119 ), 120 content_type="application/platformsso-login-response+jwt", 121 )
def
length_prefixed(data: bytes) -> bytes:
def
build_apu( public_key: cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey):
24def build_apu(public_key: ec.EllipticCurvePublicKey): 25 # X9.63 representation: 0x04 || X || Y 26 public_numbers = public_key.public_numbers() 27 28 x_bytes = public_numbers.x.to_bytes(32, "big") 29 y_bytes = public_numbers.y.to_bytes(32, "big") 30 31 x963 = bytes([0x04]) + x_bytes + y_bytes 32 33 result = length_prefixed(b"APPLE") + length_prefixed(x963) 34 35 return result
def
encrypt_token_with_a256_gcm(body: dict, device_encryption_key: str, apv: bytes) -> str:
38def encrypt_token_with_a256_gcm(body: dict, device_encryption_key: str, apv: bytes) -> str: 39 ephemeral_key = ec.generate_private_key(curve=ec.SECP256R1()) 40 device_public_key = serialization.load_pem_public_key( 41 device_encryption_key.encode(), backend=default_backend() 42 ) 43 44 shared_secret_z = ephemeral_key.exchange(ec.ECDH(), device_public_key) 45 46 apu = build_apu(ephemeral_key.public_key()) 47 48 jwe_header = { 49 "enc": "A256GCM", 50 "kid": "ephemeralKey", 51 "epk": { 52 "x": base64url_encode( 53 ephemeral_key.public_key().public_numbers().x.to_bytes(32, "big") 54 ), 55 "y": base64url_encode( 56 ephemeral_key.public_key().public_numbers().y.to_bytes(32, "big") 57 ), 58 "kty": "EC", 59 "crv": "P-256", 60 }, 61 "typ": "platformsso-login-response+jwt", 62 "alg": "ECDH-ES", 63 "apu": base64url_encode(apu), 64 "apv": base64url_encode(apv), 65 } 66 67 party_u_info = length_prefixed(apu) 68 party_v_info = length_prefixed(apv) 69 supp_pub_info = (256).to_bytes(4, "big") 70 71 other_info = length_prefixed(b"A256GCM") + party_u_info + party_v_info + supp_pub_info 72 73 ckdf = ConcatKDFHash( 74 algorithm=hashes.SHA256(), 75 length=32, 76 otherinfo=other_info, 77 ) 78 79 derived_key = ckdf.derive(shared_secret_z) 80 81 nonce = token_bytes(96) 82 83 header_json = dumps(jwe_header, separators=(",", ":")).encode() 84 aad = urlsafe_b64encode(header_json).rstrip(b"=") 85 86 cipher = Cipher(AES(derived_key), GCM(nonce)) 87 encryptor = cipher.encryptor() 88 encryptor.authenticate_additional_data(aad) 89 ciphertext = encryptor.update(dumps(body).encode()) + encryptor.finalize() 90 91 # base64url encoding 92 protected_b64 = urlsafe_b64encode(header_json).rstrip(b"=") 93 iv_b64 = urlsafe_b64encode(nonce).rstrip(b"=") 94 ciphertext_b64 = urlsafe_b64encode(ciphertext).rstrip(b"=") 95 tag_b64 = urlsafe_b64encode(encryptor.tag).rstrip(b"=") 96 97 jwe_compact = b".".join( 98 [ 99 protected_b64, 100 b"", 101 iv_b64, 102 ciphertext_b64, 103 tag_b64, 104 ] 105 ) 106 return jwe_compact.decode()
class
JWEResponse(django.http.response.HttpResponse):
109class JWEResponse(HttpResponse): 110 111 def __init__( 112 self, 113 data: dict, 114 device: AgentDeviceConnection, 115 apv: str, 116 ): 117 super().__init__( 118 content=encrypt_token_with_a256_gcm( 119 data, device.apple_encryption_key, base64url_decode(apv) 120 ), 121 content_type="application/platformsso-login-response+jwt", 122 )
An HTTP response class with a string as content.
This content can be read, appended to, or replaced.
JWEResponse( data: dict, device: authentik.endpoints.connectors.agent.models.AgentDeviceConnection, apv: str)