authentik.enterprise.license

Enterprise license

  1"""Enterprise license"""
  2
  3from base64 import b64decode
  4from binascii import Error
  5from dataclasses import asdict, dataclass, field
  6from datetime import UTC, datetime, timedelta
  7from enum import Enum
  8from functools import lru_cache
  9from time import mktime
 10
 11from cryptography.exceptions import InvalidSignature
 12from cryptography.x509 import Certificate, load_der_x509_certificate, load_pem_x509_certificate
 13from dacite import DaciteError, from_dict
 14from django.core.cache import cache
 15from django.db.models.query import QuerySet
 16from django.utils.timezone import now
 17from jwt import PyJWTError, decode, get_unverified_header
 18from jwt.algorithms import ECAlgorithm
 19from rest_framework.exceptions import ValidationError
 20from rest_framework.fields import (
 21    ChoiceField,
 22    DateTimeField,
 23    IntegerField,
 24    ListField,
 25)
 26
 27from authentik.core.api.utils import PassiveSerializer
 28from authentik.core.models import User, UserTypes
 29from authentik.enterprise.models import (
 30    THRESHOLD_READ_ONLY_WEEKS,
 31    THRESHOLD_WARNING_ADMIN_WEEKS,
 32    THRESHOLD_WARNING_EXPIRY_WEEKS,
 33    THRESHOLD_WARNING_USER_WEEKS,
 34    License,
 35    LicenseUsage,
 36    LicenseUsageStatus,
 37)
 38from authentik.tenants.utils import get_unique_identifier
 39
 40CACHE_KEY_ENTERPRISE_LICENSE = "goauthentik.io/enterprise/license"
 41CACHE_EXPIRY_ENTERPRISE_LICENSE = 3 * 60 * 60  # 2 Hours
 42
 43
 44@lru_cache
 45def get_licensing_key() -> Certificate:
 46    """Get Root CA PEM"""
 47    with open("authentik/enterprise/public.pem", "rb") as _key:
 48        return load_pem_x509_certificate(_key.read())
 49
 50
 51def get_license_aud() -> str:
 52    """Get the JWT audience field"""
 53    return f"enterprise.goauthentik.io/license/{get_unique_identifier()}"
 54
 55
 56class LicenseFlags(Enum):
 57    """License flags"""
 58
 59    TRIAL = "trial"
 60    NON_PRODUCTION = "non_production"
 61
 62
 63@dataclass
 64class LicenseSummary:
 65    """Internal representation of a license summary"""
 66
 67    internal_users: int
 68    external_users: int
 69    status: LicenseUsageStatus
 70    latest_valid: datetime
 71    license_flags: list[LicenseFlags]
 72
 73
 74class LicenseSummarySerializer(PassiveSerializer):
 75    """Serializer for license status"""
 76
 77    internal_users = IntegerField(required=True)
 78    external_users = IntegerField(required=True)
 79    status = ChoiceField(choices=LicenseUsageStatus.choices)
 80    latest_valid = DateTimeField()
 81    license_flags = ListField(child=ChoiceField(choices=tuple(x.value for x in LicenseFlags)))
 82
 83
 84@dataclass
 85class LicenseKey:
 86    """License JWT claims"""
 87
 88    aud: str
 89    exp: int
 90
 91    name: str
 92    internal_users: int = 0
 93    external_users: int = 0
 94    license_flags: list[LicenseFlags] = field(default_factory=list)
 95
 96    @staticmethod
 97    def validate(jwt: str, check_expiry=True) -> LicenseKey:
 98        """Validate the license from a given JWT"""
 99        try:
100            headers = get_unverified_header(jwt)
101        except PyJWTError:
102            raise ValidationError("Unable to verify license") from None
103        x5c: list[str] = headers.get("x5c", [])
104        if len(x5c) < 1:
105            raise ValidationError("Unable to verify license")
106        try:
107            our_cert = load_der_x509_certificate(b64decode(x5c[0]))
108            intermediate = load_der_x509_certificate(b64decode(x5c[1]))
109            our_cert.verify_directly_issued_by(intermediate)
110            intermediate.verify_directly_issued_by(get_licensing_key())
111        except InvalidSignature, TypeError, ValueError, Error:
112            raise ValidationError("Unable to verify license") from None
113        _validate_curve_original = ECAlgorithm._validate_curve
114        try:
115            # authentik's license are generated with `algorithm="ES512"` and signed with
116            # a key of curve `secp384r1`. Starting with version 2.11.0, pyjwt enforces the spec, see
117            # https://github.com/jpadilla/pyjwt/commit/5b8622773358e56d3d3c0a9acf404809ff34433a
118            # authentik will change its license generation to `algorithm="ES384"` in 2026.
119            # TODO: remove this when the last incompatible license runs out.
120            ECAlgorithm._validate_curve = lambda *_: True
121            body = from_dict(
122                LicenseKey,
123                decode(
124                    jwt,
125                    our_cert.public_key(),
126                    algorithms=["ES384", "ES512"],
127                    audience=get_license_aud(),
128                    options={"verify_exp": check_expiry, "verify_signature": check_expiry},
129                ),
130            )
131        except PyJWTError:
132            unverified = decode(jwt, options={"verify_signature": False})
133            if unverified["aud"] != get_license_aud():
134                raise ValidationError("Invalid Install ID in license") from None
135            raise ValidationError("Unable to verify license") from None
136        finally:
137            ECAlgorithm._validate_curve = _validate_curve_original
138        return body
139
140    @staticmethod
141    def get_total() -> LicenseKey:
142        """Get a summarized version of all (not expired) licenses"""
143        total = LicenseKey(get_license_aud(), 0, "Summarized license", 0, 0)
144        for lic in License.objects.all():
145            if lic.is_valid:
146                total.internal_users += lic.internal_users
147                total.external_users += lic.external_users
148                total.license_flags.extend(lic.status.license_flags)
149            exp_ts = int(mktime(lic.expiry.timetuple()))
150            if total.exp == 0:
151                total.exp = exp_ts
152            total.exp = max(total.exp, exp_ts)
153        return total
154
155    @staticmethod
156    def base_user_qs() -> QuerySet:
157        """Base query set for all users"""
158        return User.objects.all().exclude_anonymous().exclude(is_active=False)
159
160    @staticmethod
161    def get_internal_user_count():
162        """Get current default user count"""
163        return LicenseKey.base_user_qs().filter(type=UserTypes.INTERNAL).count()
164
165    @staticmethod
166    def get_external_user_count():
167        """Get current external user count"""
168        return LicenseKey.base_user_qs().filter(type=UserTypes.EXTERNAL).count()
169
170    def _last_valid_date(self):
171        last_valid_date = (
172            LicenseUsage.objects.order_by("-record_date")
173            .filter(status=LicenseUsageStatus.VALID)
174            .first()
175        )
176        if not last_valid_date:
177            return datetime.fromtimestamp(0, UTC)
178        return last_valid_date.record_date
179
180    def status(self) -> LicenseUsageStatus:
181        """Check if the given license body covers all users, and is valid."""
182        last_valid = self._last_valid_date()
183        if self.exp == 0 and not License.objects.exists():
184            return LicenseUsageStatus.UNLICENSED
185        _now = now()
186        # Check limit-exceeded based status
187        internal_users = self.get_internal_user_count()
188        external_users = self.get_external_user_count()
189        if internal_users > self.internal_users or external_users > self.external_users:
190            if last_valid < _now - timedelta(weeks=THRESHOLD_READ_ONLY_WEEKS):
191                return LicenseUsageStatus.READ_ONLY
192            if last_valid < _now - timedelta(weeks=THRESHOLD_WARNING_USER_WEEKS):
193                return LicenseUsageStatus.LIMIT_EXCEEDED_USER
194            if last_valid < _now - timedelta(weeks=THRESHOLD_WARNING_ADMIN_WEEKS):
195                return LicenseUsageStatus.LIMIT_EXCEEDED_ADMIN
196        # Check expiry based status
197        if datetime.fromtimestamp(self.exp, UTC) < _now:
198            if datetime.fromtimestamp(self.exp, UTC) < _now - timedelta(
199                weeks=THRESHOLD_READ_ONLY_WEEKS
200            ):
201                return LicenseUsageStatus.READ_ONLY
202            return LicenseUsageStatus.EXPIRED
203        # Expiry warning
204        if datetime.fromtimestamp(self.exp, UTC) <= _now + timedelta(
205            weeks=THRESHOLD_WARNING_EXPIRY_WEEKS
206        ):
207            return LicenseUsageStatus.EXPIRY_SOON
208        return LicenseUsageStatus.VALID
209
210    def record_usage(self):
211        """Capture the current validity status and metrics and save them"""
212        threshold = now() - timedelta(hours=8)
213        usage = (
214            LicenseUsage.objects.order_by("-record_date").filter(record_date__gte=threshold).first()
215        )
216        if not usage:
217            usage = LicenseUsage.objects.create(
218                internal_user_count=self.get_internal_user_count(),
219                external_user_count=self.get_external_user_count(),
220                status=self.status(),
221            )
222        summary = asdict(self.summary())
223        # Also cache the latest summary for the middleware
224        cache.set(CACHE_KEY_ENTERPRISE_LICENSE, summary, timeout=CACHE_EXPIRY_ENTERPRISE_LICENSE)
225        return usage
226
227    def summary(self) -> LicenseSummary:
228        """Summary of license status"""
229        status = self.status()
230        latest_valid = datetime.fromtimestamp(self.exp).replace(tzinfo=UTC)
231        return LicenseSummary(
232            latest_valid=latest_valid,
233            internal_users=self.internal_users,
234            external_users=self.external_users,
235            status=status,
236            license_flags=self.license_flags,
237        )
238
239    @staticmethod
240    def cached_summary() -> LicenseSummary:
241        """Helper method which looks up the last summary"""
242        summary = cache.get(CACHE_KEY_ENTERPRISE_LICENSE)
243        if not summary:
244            return LicenseKey.get_total().summary()
245        try:
246            return from_dict(LicenseSummary, summary)
247        except DaciteError:
248            cache.delete(CACHE_KEY_ENTERPRISE_LICENSE)
249            return LicenseKey.get_total().summary()
CACHE_KEY_ENTERPRISE_LICENSE = 'goauthentik.io/enterprise/license'
CACHE_EXPIRY_ENTERPRISE_LICENSE = 10800
@lru_cache
def get_licensing_key() -> cryptography.hazmat.bindings._rust.x509.Certificate:
45@lru_cache
46def get_licensing_key() -> Certificate:
47    """Get Root CA PEM"""
48    with open("authentik/enterprise/public.pem", "rb") as _key:
49        return load_pem_x509_certificate(_key.read())

Get Root CA PEM

def get_license_aud() -> str:
52def get_license_aud() -> str:
53    """Get the JWT audience field"""
54    return f"enterprise.goauthentik.io/license/{get_unique_identifier()}"

Get the JWT audience field

class LicenseFlags(enum.Enum):
57class LicenseFlags(Enum):
58    """License flags"""
59
60    TRIAL = "trial"
61    NON_PRODUCTION = "non_production"

License flags

TRIAL = <LicenseFlags.TRIAL: 'trial'>
NON_PRODUCTION = <LicenseFlags.NON_PRODUCTION: 'non_production'>
@dataclass
class LicenseSummary:
64@dataclass
65class LicenseSummary:
66    """Internal representation of a license summary"""
67
68    internal_users: int
69    external_users: int
70    status: LicenseUsageStatus
71    latest_valid: datetime
72    license_flags: list[LicenseFlags]

Internal representation of a license summary

LicenseSummary( internal_users: int, external_users: int, status: authentik.enterprise.models.LicenseUsageStatus, latest_valid: datetime.datetime, license_flags: list[LicenseFlags])
internal_users: int
external_users: int
latest_valid: datetime.datetime
license_flags: list[LicenseFlags]
class LicenseSummarySerializer(authentik.core.api.utils.PassiveSerializer):
75class LicenseSummarySerializer(PassiveSerializer):
76    """Serializer for license status"""
77
78    internal_users = IntegerField(required=True)
79    external_users = IntegerField(required=True)
80    status = ChoiceField(choices=LicenseUsageStatus.choices)
81    latest_valid = DateTimeField()
82    license_flags = ListField(child=ChoiceField(choices=tuple(x.value for x in LicenseFlags)))

Serializer for license status

internal_users
external_users
status
latest_valid
license_flags
@dataclass
class LicenseKey:
 85@dataclass
 86class LicenseKey:
 87    """License JWT claims"""
 88
 89    aud: str
 90    exp: int
 91
 92    name: str
 93    internal_users: int = 0
 94    external_users: int = 0
 95    license_flags: list[LicenseFlags] = field(default_factory=list)
 96
 97    @staticmethod
 98    def validate(jwt: str, check_expiry=True) -> LicenseKey:
 99        """Validate the license from a given JWT"""
100        try:
101            headers = get_unverified_header(jwt)
102        except PyJWTError:
103            raise ValidationError("Unable to verify license") from None
104        x5c: list[str] = headers.get("x5c", [])
105        if len(x5c) < 1:
106            raise ValidationError("Unable to verify license")
107        try:
108            our_cert = load_der_x509_certificate(b64decode(x5c[0]))
109            intermediate = load_der_x509_certificate(b64decode(x5c[1]))
110            our_cert.verify_directly_issued_by(intermediate)
111            intermediate.verify_directly_issued_by(get_licensing_key())
112        except InvalidSignature, TypeError, ValueError, Error:
113            raise ValidationError("Unable to verify license") from None
114        _validate_curve_original = ECAlgorithm._validate_curve
115        try:
116            # authentik's license are generated with `algorithm="ES512"` and signed with
117            # a key of curve `secp384r1`. Starting with version 2.11.0, pyjwt enforces the spec, see
118            # https://github.com/jpadilla/pyjwt/commit/5b8622773358e56d3d3c0a9acf404809ff34433a
119            # authentik will change its license generation to `algorithm="ES384"` in 2026.
120            # TODO: remove this when the last incompatible license runs out.
121            ECAlgorithm._validate_curve = lambda *_: True
122            body = from_dict(
123                LicenseKey,
124                decode(
125                    jwt,
126                    our_cert.public_key(),
127                    algorithms=["ES384", "ES512"],
128                    audience=get_license_aud(),
129                    options={"verify_exp": check_expiry, "verify_signature": check_expiry},
130                ),
131            )
132        except PyJWTError:
133            unverified = decode(jwt, options={"verify_signature": False})
134            if unverified["aud"] != get_license_aud():
135                raise ValidationError("Invalid Install ID in license") from None
136            raise ValidationError("Unable to verify license") from None
137        finally:
138            ECAlgorithm._validate_curve = _validate_curve_original
139        return body
140
141    @staticmethod
142    def get_total() -> LicenseKey:
143        """Get a summarized version of all (not expired) licenses"""
144        total = LicenseKey(get_license_aud(), 0, "Summarized license", 0, 0)
145        for lic in License.objects.all():
146            if lic.is_valid:
147                total.internal_users += lic.internal_users
148                total.external_users += lic.external_users
149                total.license_flags.extend(lic.status.license_flags)
150            exp_ts = int(mktime(lic.expiry.timetuple()))
151            if total.exp == 0:
152                total.exp = exp_ts
153            total.exp = max(total.exp, exp_ts)
154        return total
155
156    @staticmethod
157    def base_user_qs() -> QuerySet:
158        """Base query set for all users"""
159        return User.objects.all().exclude_anonymous().exclude(is_active=False)
160
161    @staticmethod
162    def get_internal_user_count():
163        """Get current default user count"""
164        return LicenseKey.base_user_qs().filter(type=UserTypes.INTERNAL).count()
165
166    @staticmethod
167    def get_external_user_count():
168        """Get current external user count"""
169        return LicenseKey.base_user_qs().filter(type=UserTypes.EXTERNAL).count()
170
171    def _last_valid_date(self):
172        last_valid_date = (
173            LicenseUsage.objects.order_by("-record_date")
174            .filter(status=LicenseUsageStatus.VALID)
175            .first()
176        )
177        if not last_valid_date:
178            return datetime.fromtimestamp(0, UTC)
179        return last_valid_date.record_date
180
181    def status(self) -> LicenseUsageStatus:
182        """Check if the given license body covers all users, and is valid."""
183        last_valid = self._last_valid_date()
184        if self.exp == 0 and not License.objects.exists():
185            return LicenseUsageStatus.UNLICENSED
186        _now = now()
187        # Check limit-exceeded based status
188        internal_users = self.get_internal_user_count()
189        external_users = self.get_external_user_count()
190        if internal_users > self.internal_users or external_users > self.external_users:
191            if last_valid < _now - timedelta(weeks=THRESHOLD_READ_ONLY_WEEKS):
192                return LicenseUsageStatus.READ_ONLY
193            if last_valid < _now - timedelta(weeks=THRESHOLD_WARNING_USER_WEEKS):
194                return LicenseUsageStatus.LIMIT_EXCEEDED_USER
195            if last_valid < _now - timedelta(weeks=THRESHOLD_WARNING_ADMIN_WEEKS):
196                return LicenseUsageStatus.LIMIT_EXCEEDED_ADMIN
197        # Check expiry based status
198        if datetime.fromtimestamp(self.exp, UTC) < _now:
199            if datetime.fromtimestamp(self.exp, UTC) < _now - timedelta(
200                weeks=THRESHOLD_READ_ONLY_WEEKS
201            ):
202                return LicenseUsageStatus.READ_ONLY
203            return LicenseUsageStatus.EXPIRED
204        # Expiry warning
205        if datetime.fromtimestamp(self.exp, UTC) <= _now + timedelta(
206            weeks=THRESHOLD_WARNING_EXPIRY_WEEKS
207        ):
208            return LicenseUsageStatus.EXPIRY_SOON
209        return LicenseUsageStatus.VALID
210
211    def record_usage(self):
212        """Capture the current validity status and metrics and save them"""
213        threshold = now() - timedelta(hours=8)
214        usage = (
215            LicenseUsage.objects.order_by("-record_date").filter(record_date__gte=threshold).first()
216        )
217        if not usage:
218            usage = LicenseUsage.objects.create(
219                internal_user_count=self.get_internal_user_count(),
220                external_user_count=self.get_external_user_count(),
221                status=self.status(),
222            )
223        summary = asdict(self.summary())
224        # Also cache the latest summary for the middleware
225        cache.set(CACHE_KEY_ENTERPRISE_LICENSE, summary, timeout=CACHE_EXPIRY_ENTERPRISE_LICENSE)
226        return usage
227
228    def summary(self) -> LicenseSummary:
229        """Summary of license status"""
230        status = self.status()
231        latest_valid = datetime.fromtimestamp(self.exp).replace(tzinfo=UTC)
232        return LicenseSummary(
233            latest_valid=latest_valid,
234            internal_users=self.internal_users,
235            external_users=self.external_users,
236            status=status,
237            license_flags=self.license_flags,
238        )
239
240    @staticmethod
241    def cached_summary() -> LicenseSummary:
242        """Helper method which looks up the last summary"""
243        summary = cache.get(CACHE_KEY_ENTERPRISE_LICENSE)
244        if not summary:
245            return LicenseKey.get_total().summary()
246        try:
247            return from_dict(LicenseSummary, summary)
248        except DaciteError:
249            cache.delete(CACHE_KEY_ENTERPRISE_LICENSE)
250            return LicenseKey.get_total().summary()

License JWT claims

LicenseKey( aud: str, exp: int, name: str, internal_users: int = 0, external_users: int = 0, license_flags: list[LicenseFlags] = <factory>)
aud: str
exp: int
name: str
internal_users: int = 0
external_users: int = 0
license_flags: list[LicenseFlags]
@staticmethod
def validate(jwt: str, check_expiry=True) -> LicenseKey:
 97    @staticmethod
 98    def validate(jwt: str, check_expiry=True) -> LicenseKey:
 99        """Validate the license from a given JWT"""
100        try:
101            headers = get_unverified_header(jwt)
102        except PyJWTError:
103            raise ValidationError("Unable to verify license") from None
104        x5c: list[str] = headers.get("x5c", [])
105        if len(x5c) < 1:
106            raise ValidationError("Unable to verify license")
107        try:
108            our_cert = load_der_x509_certificate(b64decode(x5c[0]))
109            intermediate = load_der_x509_certificate(b64decode(x5c[1]))
110            our_cert.verify_directly_issued_by(intermediate)
111            intermediate.verify_directly_issued_by(get_licensing_key())
112        except InvalidSignature, TypeError, ValueError, Error:
113            raise ValidationError("Unable to verify license") from None
114        _validate_curve_original = ECAlgorithm._validate_curve
115        try:
116            # authentik's license are generated with `algorithm="ES512"` and signed with
117            # a key of curve `secp384r1`. Starting with version 2.11.0, pyjwt enforces the spec, see
118            # https://github.com/jpadilla/pyjwt/commit/5b8622773358e56d3d3c0a9acf404809ff34433a
119            # authentik will change its license generation to `algorithm="ES384"` in 2026.
120            # TODO: remove this when the last incompatible license runs out.
121            ECAlgorithm._validate_curve = lambda *_: True
122            body = from_dict(
123                LicenseKey,
124                decode(
125                    jwt,
126                    our_cert.public_key(),
127                    algorithms=["ES384", "ES512"],
128                    audience=get_license_aud(),
129                    options={"verify_exp": check_expiry, "verify_signature": check_expiry},
130                ),
131            )
132        except PyJWTError:
133            unverified = decode(jwt, options={"verify_signature": False})
134            if unverified["aud"] != get_license_aud():
135                raise ValidationError("Invalid Install ID in license") from None
136            raise ValidationError("Unable to verify license") from None
137        finally:
138            ECAlgorithm._validate_curve = _validate_curve_original
139        return body

Validate the license from a given JWT

@staticmethod
def get_total() -> LicenseKey:
141    @staticmethod
142    def get_total() -> LicenseKey:
143        """Get a summarized version of all (not expired) licenses"""
144        total = LicenseKey(get_license_aud(), 0, "Summarized license", 0, 0)
145        for lic in License.objects.all():
146            if lic.is_valid:
147                total.internal_users += lic.internal_users
148                total.external_users += lic.external_users
149                total.license_flags.extend(lic.status.license_flags)
150            exp_ts = int(mktime(lic.expiry.timetuple()))
151            if total.exp == 0:
152                total.exp = exp_ts
153            total.exp = max(total.exp, exp_ts)
154        return total

Get a summarized version of all (not expired) licenses

@staticmethod
def base_user_qs() -> django.db.models.query.QuerySet:
156    @staticmethod
157    def base_user_qs() -> QuerySet:
158        """Base query set for all users"""
159        return User.objects.all().exclude_anonymous().exclude(is_active=False)

Base query set for all users

@staticmethod
def get_internal_user_count():
161    @staticmethod
162    def get_internal_user_count():
163        """Get current default user count"""
164        return LicenseKey.base_user_qs().filter(type=UserTypes.INTERNAL).count()

Get current default user count

@staticmethod
def get_external_user_count():
166    @staticmethod
167    def get_external_user_count():
168        """Get current external user count"""
169        return LicenseKey.base_user_qs().filter(type=UserTypes.EXTERNAL).count()

Get current external user count

def status(self) -> authentik.enterprise.models.LicenseUsageStatus:
181    def status(self) -> LicenseUsageStatus:
182        """Check if the given license body covers all users, and is valid."""
183        last_valid = self._last_valid_date()
184        if self.exp == 0 and not License.objects.exists():
185            return LicenseUsageStatus.UNLICENSED
186        _now = now()
187        # Check limit-exceeded based status
188        internal_users = self.get_internal_user_count()
189        external_users = self.get_external_user_count()
190        if internal_users > self.internal_users or external_users > self.external_users:
191            if last_valid < _now - timedelta(weeks=THRESHOLD_READ_ONLY_WEEKS):
192                return LicenseUsageStatus.READ_ONLY
193            if last_valid < _now - timedelta(weeks=THRESHOLD_WARNING_USER_WEEKS):
194                return LicenseUsageStatus.LIMIT_EXCEEDED_USER
195            if last_valid < _now - timedelta(weeks=THRESHOLD_WARNING_ADMIN_WEEKS):
196                return LicenseUsageStatus.LIMIT_EXCEEDED_ADMIN
197        # Check expiry based status
198        if datetime.fromtimestamp(self.exp, UTC) < _now:
199            if datetime.fromtimestamp(self.exp, UTC) < _now - timedelta(
200                weeks=THRESHOLD_READ_ONLY_WEEKS
201            ):
202                return LicenseUsageStatus.READ_ONLY
203            return LicenseUsageStatus.EXPIRED
204        # Expiry warning
205        if datetime.fromtimestamp(self.exp, UTC) <= _now + timedelta(
206            weeks=THRESHOLD_WARNING_EXPIRY_WEEKS
207        ):
208            return LicenseUsageStatus.EXPIRY_SOON
209        return LicenseUsageStatus.VALID

Check if the given license body covers all users, and is valid.

def record_usage(self):
211    def record_usage(self):
212        """Capture the current validity status and metrics and save them"""
213        threshold = now() - timedelta(hours=8)
214        usage = (
215            LicenseUsage.objects.order_by("-record_date").filter(record_date__gte=threshold).first()
216        )
217        if not usage:
218            usage = LicenseUsage.objects.create(
219                internal_user_count=self.get_internal_user_count(),
220                external_user_count=self.get_external_user_count(),
221                status=self.status(),
222            )
223        summary = asdict(self.summary())
224        # Also cache the latest summary for the middleware
225        cache.set(CACHE_KEY_ENTERPRISE_LICENSE, summary, timeout=CACHE_EXPIRY_ENTERPRISE_LICENSE)
226        return usage

Capture the current validity status and metrics and save them

def summary(self) -> LicenseSummary:
228    def summary(self) -> LicenseSummary:
229        """Summary of license status"""
230        status = self.status()
231        latest_valid = datetime.fromtimestamp(self.exp).replace(tzinfo=UTC)
232        return LicenseSummary(
233            latest_valid=latest_valid,
234            internal_users=self.internal_users,
235            external_users=self.external_users,
236            status=status,
237            license_flags=self.license_flags,
238        )

Summary of license status

@staticmethod
def cached_summary() -> LicenseSummary:
240    @staticmethod
241    def cached_summary() -> LicenseSummary:
242        """Helper method which looks up the last summary"""
243        summary = cache.get(CACHE_KEY_ENTERPRISE_LICENSE)
244        if not summary:
245            return LicenseKey.get_total().summary()
246        try:
247            return from_dict(LicenseSummary, summary)
248        except DaciteError:
249            cache.delete(CACHE_KEY_ENTERPRISE_LICENSE)
250            return LicenseKey.get_total().summary()

Helper method which looks up the last summary