authentik.root.ws.consumer

websocket Message consumer

 1"""websocket Message consumer"""
 2
 3from hashlib import sha256
 4
 5from asgiref.sync import async_to_sync
 6from channels.generic.websocket import JsonWebsocketConsumer
 7from django.core.cache import cache
 8from django.db import connection
 9
10from authentik.core.models import User
11from authentik.root.ws.storage import CACHE_PREFIX
12
13
14def build_session_group(session_key: str):
15    return sha256(
16        f"{connection.schema_name}/group_client_session_{str(session_key)}".encode()
17    ).hexdigest()
18
19
20def build_device_group(device_id: str):
21    return sha256(
22        f"{connection.schema_name}/group_client_device_{str(device_id)}".encode()
23    ).hexdigest()
24
25
26def build_user_group(user: User):
27    return sha256(f"{connection.schema_name}/group_client_user_{user.uuid}".encode()).hexdigest()
28
29
30class MessageConsumer(JsonWebsocketConsumer):
31    """Consumer which sends django.contrib.messages Messages over WS.
32    channel_name is saved into cache with user_id, and when a add_message is called"""
33
34    session_key: str
35    device_cookie: str | None = None
36    user: User | None = None
37
38    def connect(self):
39        self.accept()
40        self.session_key = self.scope["session"].session_key
41        if self.session_key:
42            cache.set(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}", True, None)
43        if user := self.scope.get("user"):
44            if user.is_authenticated:
45                async_to_sync(self.channel_layer.group_add)(
46                    build_user_group(user), self.channel_name
47                )
48        if device_cookie := self.scope["cookies"].get("authentik_device", None):
49            self.device_cookie = device_cookie
50            async_to_sync(self.channel_layer.group_add)(
51                build_device_group(self.device_cookie), self.channel_name
52            )
53
54    def disconnect(self, code):
55        if self.session_key:
56            cache.delete(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}")
57        if self.device_cookie:
58            async_to_sync(self.channel_layer.group_discard)(
59                build_device_group(self.device_cookie), self.channel_name
60            )
61        if self.user:
62            async_to_sync(self.channel_layer.group_discard)(
63                build_user_group(self.user), self.channel_name
64            )
65
66    def event_message(self, event: dict):
67        """Event handler which is called by Messages Storage backend"""
68        self.send_json(event)
69
70    def event_session_authenticated(self, event: dict):
71        """Event handler post user authentication"""
72        self.send_json({"message_type": "session.authenticated", **event})
73
74    def event_notification(self, event: dict):
75        """Event handler for new notifications"""
76        self.send_json({"message_type": "notification.new", **event})
def build_session_group(session_key: str):
15def build_session_group(session_key: str):
16    return sha256(
17        f"{connection.schema_name}/group_client_session_{str(session_key)}".encode()
18    ).hexdigest()
def build_device_group(device_id: str):
21def build_device_group(device_id: str):
22    return sha256(
23        f"{connection.schema_name}/group_client_device_{str(device_id)}".encode()
24    ).hexdigest()
def build_user_group(user: authentik.core.models.User):
27def build_user_group(user: User):
28    return sha256(f"{connection.schema_name}/group_client_user_{user.uuid}".encode()).hexdigest()
class MessageConsumer(channels.generic.websocket.JsonWebsocketConsumer):
31class MessageConsumer(JsonWebsocketConsumer):
32    """Consumer which sends django.contrib.messages Messages over WS.
33    channel_name is saved into cache with user_id, and when a add_message is called"""
34
35    session_key: str
36    device_cookie: str | None = None
37    user: User | None = None
38
39    def connect(self):
40        self.accept()
41        self.session_key = self.scope["session"].session_key
42        if self.session_key:
43            cache.set(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}", True, None)
44        if user := self.scope.get("user"):
45            if user.is_authenticated:
46                async_to_sync(self.channel_layer.group_add)(
47                    build_user_group(user), self.channel_name
48                )
49        if device_cookie := self.scope["cookies"].get("authentik_device", None):
50            self.device_cookie = device_cookie
51            async_to_sync(self.channel_layer.group_add)(
52                build_device_group(self.device_cookie), self.channel_name
53            )
54
55    def disconnect(self, code):
56        if self.session_key:
57            cache.delete(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}")
58        if self.device_cookie:
59            async_to_sync(self.channel_layer.group_discard)(
60                build_device_group(self.device_cookie), self.channel_name
61            )
62        if self.user:
63            async_to_sync(self.channel_layer.group_discard)(
64                build_user_group(self.user), self.channel_name
65            )
66
67    def event_message(self, event: dict):
68        """Event handler which is called by Messages Storage backend"""
69        self.send_json(event)
70
71    def event_session_authenticated(self, event: dict):
72        """Event handler post user authentication"""
73        self.send_json({"message_type": "session.authenticated", **event})
74
75    def event_notification(self, event: dict):
76        """Event handler for new notifications"""
77        self.send_json({"message_type": "notification.new", **event})

Consumer which sends django.contrib.messages Messages over WS. channel_name is saved into cache with user_id, and when a add_message is called

session_key: str
user: authentik.core.models.User | None = None
def connect(self):
39    def connect(self):
40        self.accept()
41        self.session_key = self.scope["session"].session_key
42        if self.session_key:
43            cache.set(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}", True, None)
44        if user := self.scope.get("user"):
45            if user.is_authenticated:
46                async_to_sync(self.channel_layer.group_add)(
47                    build_user_group(user), self.channel_name
48                )
49        if device_cookie := self.scope["cookies"].get("authentik_device", None):
50            self.device_cookie = device_cookie
51            async_to_sync(self.channel_layer.group_add)(
52                build_device_group(self.device_cookie), self.channel_name
53            )
def disconnect(self, code):
55    def disconnect(self, code):
56        if self.session_key:
57            cache.delete(f"{CACHE_PREFIX}{self.session_key}_messages_{self.channel_name}")
58        if self.device_cookie:
59            async_to_sync(self.channel_layer.group_discard)(
60                build_device_group(self.device_cookie), self.channel_name
61            )
62        if self.user:
63            async_to_sync(self.channel_layer.group_discard)(
64                build_user_group(self.user), self.channel_name
65            )

Called when a WebSocket connection is closed.

def event_message(self, event: dict):
67    def event_message(self, event: dict):
68        """Event handler which is called by Messages Storage backend"""
69        self.send_json(event)

Event handler which is called by Messages Storage backend

def event_session_authenticated(self, event: dict):
71    def event_session_authenticated(self, event: dict):
72        """Event handler post user authentication"""
73        self.send_json({"message_type": "session.authenticated", **event})

Event handler post user authentication

def event_notification(self, event: dict):
75    def event_notification(self, event: dict):
76        """Event handler for new notifications"""
77        self.send_json({"message_type": "notification.new", **event})

Event handler for new notifications