authentik.flows.api.flows_diagram

Flows Diagram API

  1"""Flows Diagram API"""
  2
  3from dataclasses import dataclass, field
  4
  5from django.utils.translation import gettext as _
  6from guardian.shortcuts import get_objects_for_user
  7from rest_framework.serializers import CharField
  8
  9from authentik.core.api.utils import PassiveSerializer
 10from authentik.core.models import User
 11from authentik.flows.models import Flow, FlowAuthenticationRequirement, FlowStageBinding
 12
 13
 14@dataclass
 15class DiagramElement:
 16    """Single element used in a diagram"""
 17
 18    identifier: str
 19    description: str
 20    action: str | None = None
 21    source: list[DiagramElement] | None = None
 22
 23    style: list[str] = field(default_factory=lambda: ["[", "]"])
 24
 25    def __str__(self) -> str:
 26        description = self.description.replace('"', "#quot;")
 27        element = f'{self.identifier}{self.style[0]}"{description}"{self.style[1]}'
 28        if self.action is not None:
 29            if self.action != "":
 30                element = f"--{self.action}--> {element}"
 31            else:
 32                element = f"--> {element}"
 33        if self.source:
 34            source_element = []
 35            for source in self.source:
 36                source_element.append(f"{source.identifier} {element}")
 37            return "\n".join(source_element)
 38        return element
 39
 40
 41class FlowDiagramSerializer(PassiveSerializer):
 42    """response of the flow's diagram action"""
 43
 44    diagram = CharField(read_only=True)
 45
 46
 47class FlowDiagram:
 48    """Generate flow chart fow a flow"""
 49
 50    flow: Flow
 51    user: User
 52
 53    def __init__(self, flow: Flow, user: User) -> None:
 54        self.flow = flow
 55        self.user = user
 56
 57    def get_flow_policies(self, parent_elements: list[DiagramElement]) -> list[DiagramElement]:
 58        """Collect all policies bound to the flow"""
 59        elements = []
 60        for p_index, policy_binding in enumerate(
 61            get_objects_for_user(self.user, "authentik_policies.view_policybinding")
 62            .filter(target=self.flow)
 63            .exclude(policy__isnull=True)
 64            .order_by("order")
 65        ):
 66            element = DiagramElement(
 67                f"flow_policy_{p_index}",
 68                _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name}))
 69                + "\n"
 70                + policy_binding.policy.name,
 71                _("Binding {order}".format_map({"order": policy_binding.order})),
 72                parent_elements,
 73                style=["{{", "}}"],
 74            )
 75            elements.append(element)
 76        return elements
 77
 78    def get_stage_policies(
 79        self,
 80        stage_index: int,
 81        stage_binding: FlowStageBinding,
 82        parent_elements: list[DiagramElement],
 83    ) -> list[DiagramElement]:
 84        """First all policies bound to stages since they execute before stages"""
 85        elements = []
 86        for p_index, policy_binding in enumerate(
 87            get_objects_for_user(self.user, "authentik_policies.view_policybinding")
 88            .filter(target=stage_binding)
 89            .exclude(policy__isnull=True)
 90            .order_by("order")
 91        ):
 92            element = DiagramElement(
 93                f"stage_{stage_index}_policy_{p_index}",
 94                _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name}))
 95                + "\n"
 96                + policy_binding.policy.name,
 97                "",
 98                parent_elements,
 99                style=["{{", "}}"],
100            )
101            elements.append(element)
102        return elements
103
104    def get_stages(self, parent_elements: list[DiagramElement]) -> list[str | DiagramElement]:
105        """Collect all stages"""
106        elements = []
107        stages = []
108        for s_index, stage_binding in enumerate(
109            get_objects_for_user(self.user, "authentik_flows.view_flowstagebinding")
110            .filter(target=self.flow)
111            .order_by("order")
112        ):
113            stage_policies = self.get_stage_policies(s_index, stage_binding, parent_elements)
114            elements.extend(stage_policies)
115
116            action = ""
117            if len(stage_policies) > 0:
118                action = _("Policy passed")
119
120            element = DiagramElement(
121                f"stage_{s_index}",
122                _("Stage ({type})".format_map({"type": stage_binding.stage._meta.verbose_name}))
123                + "\n"
124                + stage_binding.stage.name,
125                action,
126                stage_policies,
127                style=["([", "])"],
128            )
129            stages.append(element)
130
131            parent_elements = [element]
132
133            # This adds connections for policy denies, but retroactively, as we can't really
134            # look ahead
135            # Check if we have a stage behind us and if it has any sources
136            if s_index > 0:
137                last_stage: DiagramElement = stages[s_index - 1]
138                if last_stage.source and len(last_stage.source) > 0:
139                    # If it has any sources, add a connection from each of that stage's sources
140                    # to this stage
141                    for source in last_stage.source:
142                        elements.append(
143                            DiagramElement(
144                                element.identifier,
145                                element.description,
146                                _("Policy denied"),
147                                [source],
148                                style=element.style,
149                            )
150                        )
151
152        if len(stages) > 0:
153            elements.append(
154                DiagramElement(
155                    "done",
156                    _("End of the flow"),
157                    "",
158                    [stages[-1]],
159                    style=["[[", "]]"],
160                ),
161            )
162        return stages + elements
163
164    def get_flow_auth_requirement(self) -> list[DiagramElement]:
165        """Get flow authentication requirement"""
166        end_el = DiagramElement(
167            "done",
168            _("End of the flow"),
169            _("Requirement not fulfilled"),
170            style=["[[", "]]"],
171        )
172        elements = []
173        if self.flow.authentication == FlowAuthenticationRequirement.NONE:
174            return []
175        auth = DiagramElement(
176            "flow_auth_requirement",
177            _("Flow authentication requirement") + "\n" + self.flow.authentication,
178        )
179        elements.append(auth)
180        end_el.source = [auth]
181        elements.append(end_el)
182        elements.append(
183            DiagramElement("flow_start", "placeholder", _("Requirement fulfilled"), source=[auth])
184        )
185        return elements
186
187    def build(self) -> str:
188        """Build flowchart"""
189        all_elements = [
190            "graph TD",
191        ]
192
193        all_elements.extend(self.get_flow_auth_requirement())
194
195        pre_flow_policies_element = DiagramElement(
196            "flow_pre", _("Pre-flow policies"), style=["[[", "]]"]
197        )
198        flow_policies = self.get_flow_policies([pre_flow_policies_element])
199        if len(flow_policies) > 0:
200            all_elements.append(pre_flow_policies_element)
201            all_elements.extend(flow_policies)
202            all_elements.append(
203                DiagramElement(
204                    "done",
205                    _("End of the flow"),
206                    _("Policy denied"),
207                    flow_policies,
208                    style=["[[", "]]"],
209                )
210            )
211
212        flow_element = DiagramElement(
213            "flow_start",
214            _("Flow") + "\n" + self.flow.name,
215            "" if len(flow_policies) > 0 else None,
216            source=flow_policies,
217            style=["[[", "]]"],
218        )
219        all_elements.append(flow_element)
220
221        stages = self.get_stages([flow_element])
222        all_elements.extend(stages)
223        if len(stages) < 1:
224            all_elements.append(
225                DiagramElement(
226                    "done",
227                    _("End of the flow"),
228                    "",
229                    [flow_element],
230                    style=["[[", "]]"],
231                ),
232            )
233        return "\n".join([str(x) for x in all_elements])
@dataclass
class DiagramElement:
15@dataclass
16class DiagramElement:
17    """Single element used in a diagram"""
18
19    identifier: str
20    description: str
21    action: str | None = None
22    source: list[DiagramElement] | None = None
23
24    style: list[str] = field(default_factory=lambda: ["[", "]"])
25
26    def __str__(self) -> str:
27        description = self.description.replace('"', "#quot;")
28        element = f'{self.identifier}{self.style[0]}"{description}"{self.style[1]}'
29        if self.action is not None:
30            if self.action != "":
31                element = f"--{self.action}--> {element}"
32            else:
33                element = f"--> {element}"
34        if self.source:
35            source_element = []
36            for source in self.source:
37                source_element.append(f"{source.identifier} {element}")
38            return "\n".join(source_element)
39        return element

Single element used in a diagram

DiagramElement( identifier: str, description: str, action: str | None = None, source: list[DiagramElement] | None = None, style: list[str] = <factory>)
identifier: str
description: str
action: str | None = None
source: list[DiagramElement] | None = None
style: list[str]
class FlowDiagramSerializer(authentik.core.api.utils.PassiveSerializer):
42class FlowDiagramSerializer(PassiveSerializer):
43    """response of the flow's diagram action"""
44
45    diagram = CharField(read_only=True)

response of the flow's diagram action

diagram
class FlowDiagram:
 48class FlowDiagram:
 49    """Generate flow chart fow a flow"""
 50
 51    flow: Flow
 52    user: User
 53
 54    def __init__(self, flow: Flow, user: User) -> None:
 55        self.flow = flow
 56        self.user = user
 57
 58    def get_flow_policies(self, parent_elements: list[DiagramElement]) -> list[DiagramElement]:
 59        """Collect all policies bound to the flow"""
 60        elements = []
 61        for p_index, policy_binding in enumerate(
 62            get_objects_for_user(self.user, "authentik_policies.view_policybinding")
 63            .filter(target=self.flow)
 64            .exclude(policy__isnull=True)
 65            .order_by("order")
 66        ):
 67            element = DiagramElement(
 68                f"flow_policy_{p_index}",
 69                _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name}))
 70                + "\n"
 71                + policy_binding.policy.name,
 72                _("Binding {order}".format_map({"order": policy_binding.order})),
 73                parent_elements,
 74                style=["{{", "}}"],
 75            )
 76            elements.append(element)
 77        return elements
 78
 79    def get_stage_policies(
 80        self,
 81        stage_index: int,
 82        stage_binding: FlowStageBinding,
 83        parent_elements: list[DiagramElement],
 84    ) -> list[DiagramElement]:
 85        """First all policies bound to stages since they execute before stages"""
 86        elements = []
 87        for p_index, policy_binding in enumerate(
 88            get_objects_for_user(self.user, "authentik_policies.view_policybinding")
 89            .filter(target=stage_binding)
 90            .exclude(policy__isnull=True)
 91            .order_by("order")
 92        ):
 93            element = DiagramElement(
 94                f"stage_{stage_index}_policy_{p_index}",
 95                _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name}))
 96                + "\n"
 97                + policy_binding.policy.name,
 98                "",
 99                parent_elements,
100                style=["{{", "}}"],
101            )
102            elements.append(element)
103        return elements
104
105    def get_stages(self, parent_elements: list[DiagramElement]) -> list[str | DiagramElement]:
106        """Collect all stages"""
107        elements = []
108        stages = []
109        for s_index, stage_binding in enumerate(
110            get_objects_for_user(self.user, "authentik_flows.view_flowstagebinding")
111            .filter(target=self.flow)
112            .order_by("order")
113        ):
114            stage_policies = self.get_stage_policies(s_index, stage_binding, parent_elements)
115            elements.extend(stage_policies)
116
117            action = ""
118            if len(stage_policies) > 0:
119                action = _("Policy passed")
120
121            element = DiagramElement(
122                f"stage_{s_index}",
123                _("Stage ({type})".format_map({"type": stage_binding.stage._meta.verbose_name}))
124                + "\n"
125                + stage_binding.stage.name,
126                action,
127                stage_policies,
128                style=["([", "])"],
129            )
130            stages.append(element)
131
132            parent_elements = [element]
133
134            # This adds connections for policy denies, but retroactively, as we can't really
135            # look ahead
136            # Check if we have a stage behind us and if it has any sources
137            if s_index > 0:
138                last_stage: DiagramElement = stages[s_index - 1]
139                if last_stage.source and len(last_stage.source) > 0:
140                    # If it has any sources, add a connection from each of that stage's sources
141                    # to this stage
142                    for source in last_stage.source:
143                        elements.append(
144                            DiagramElement(
145                                element.identifier,
146                                element.description,
147                                _("Policy denied"),
148                                [source],
149                                style=element.style,
150                            )
151                        )
152
153        if len(stages) > 0:
154            elements.append(
155                DiagramElement(
156                    "done",
157                    _("End of the flow"),
158                    "",
159                    [stages[-1]],
160                    style=["[[", "]]"],
161                ),
162            )
163        return stages + elements
164
165    def get_flow_auth_requirement(self) -> list[DiagramElement]:
166        """Get flow authentication requirement"""
167        end_el = DiagramElement(
168            "done",
169            _("End of the flow"),
170            _("Requirement not fulfilled"),
171            style=["[[", "]]"],
172        )
173        elements = []
174        if self.flow.authentication == FlowAuthenticationRequirement.NONE:
175            return []
176        auth = DiagramElement(
177            "flow_auth_requirement",
178            _("Flow authentication requirement") + "\n" + self.flow.authentication,
179        )
180        elements.append(auth)
181        end_el.source = [auth]
182        elements.append(end_el)
183        elements.append(
184            DiagramElement("flow_start", "placeholder", _("Requirement fulfilled"), source=[auth])
185        )
186        return elements
187
188    def build(self) -> str:
189        """Build flowchart"""
190        all_elements = [
191            "graph TD",
192        ]
193
194        all_elements.extend(self.get_flow_auth_requirement())
195
196        pre_flow_policies_element = DiagramElement(
197            "flow_pre", _("Pre-flow policies"), style=["[[", "]]"]
198        )
199        flow_policies = self.get_flow_policies([pre_flow_policies_element])
200        if len(flow_policies) > 0:
201            all_elements.append(pre_flow_policies_element)
202            all_elements.extend(flow_policies)
203            all_elements.append(
204                DiagramElement(
205                    "done",
206                    _("End of the flow"),
207                    _("Policy denied"),
208                    flow_policies,
209                    style=["[[", "]]"],
210                )
211            )
212
213        flow_element = DiagramElement(
214            "flow_start",
215            _("Flow") + "\n" + self.flow.name,
216            "" if len(flow_policies) > 0 else None,
217            source=flow_policies,
218            style=["[[", "]]"],
219        )
220        all_elements.append(flow_element)
221
222        stages = self.get_stages([flow_element])
223        all_elements.extend(stages)
224        if len(stages) < 1:
225            all_elements.append(
226                DiagramElement(
227                    "done",
228                    _("End of the flow"),
229                    "",
230                    [flow_element],
231                    style=["[[", "]]"],
232                ),
233            )
234        return "\n".join([str(x) for x in all_elements])

Generate flow chart fow a flow

FlowDiagram(flow: authentik.flows.models.Flow, user: authentik.core.models.User)
54    def __init__(self, flow: Flow, user: User) -> None:
55        self.flow = flow
56        self.user = user
def get_flow_policies( self, parent_elements: list[DiagramElement]) -> list[DiagramElement]:
58    def get_flow_policies(self, parent_elements: list[DiagramElement]) -> list[DiagramElement]:
59        """Collect all policies bound to the flow"""
60        elements = []
61        for p_index, policy_binding in enumerate(
62            get_objects_for_user(self.user, "authentik_policies.view_policybinding")
63            .filter(target=self.flow)
64            .exclude(policy__isnull=True)
65            .order_by("order")
66        ):
67            element = DiagramElement(
68                f"flow_policy_{p_index}",
69                _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name}))
70                + "\n"
71                + policy_binding.policy.name,
72                _("Binding {order}".format_map({"order": policy_binding.order})),
73                parent_elements,
74                style=["{{", "}}"],
75            )
76            elements.append(element)
77        return elements

Collect all policies bound to the flow

def get_stage_policies( self, stage_index: int, stage_binding: authentik.flows.models.FlowStageBinding, parent_elements: list[DiagramElement]) -> list[DiagramElement]:
 79    def get_stage_policies(
 80        self,
 81        stage_index: int,
 82        stage_binding: FlowStageBinding,
 83        parent_elements: list[DiagramElement],
 84    ) -> list[DiagramElement]:
 85        """First all policies bound to stages since they execute before stages"""
 86        elements = []
 87        for p_index, policy_binding in enumerate(
 88            get_objects_for_user(self.user, "authentik_policies.view_policybinding")
 89            .filter(target=stage_binding)
 90            .exclude(policy__isnull=True)
 91            .order_by("order")
 92        ):
 93            element = DiagramElement(
 94                f"stage_{stage_index}_policy_{p_index}",
 95                _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name}))
 96                + "\n"
 97                + policy_binding.policy.name,
 98                "",
 99                parent_elements,
100                style=["{{", "}}"],
101            )
102            elements.append(element)
103        return elements

First all policies bound to stages since they execute before stages

def get_stages( self, parent_elements: list[DiagramElement]) -> list[str | DiagramElement]:
105    def get_stages(self, parent_elements: list[DiagramElement]) -> list[str | DiagramElement]:
106        """Collect all stages"""
107        elements = []
108        stages = []
109        for s_index, stage_binding in enumerate(
110            get_objects_for_user(self.user, "authentik_flows.view_flowstagebinding")
111            .filter(target=self.flow)
112            .order_by("order")
113        ):
114            stage_policies = self.get_stage_policies(s_index, stage_binding, parent_elements)
115            elements.extend(stage_policies)
116
117            action = ""
118            if len(stage_policies) > 0:
119                action = _("Policy passed")
120
121            element = DiagramElement(
122                f"stage_{s_index}",
123                _("Stage ({type})".format_map({"type": stage_binding.stage._meta.verbose_name}))
124                + "\n"
125                + stage_binding.stage.name,
126                action,
127                stage_policies,
128                style=["([", "])"],
129            )
130            stages.append(element)
131
132            parent_elements = [element]
133
134            # This adds connections for policy denies, but retroactively, as we can't really
135            # look ahead
136            # Check if we have a stage behind us and if it has any sources
137            if s_index > 0:
138                last_stage: DiagramElement = stages[s_index - 1]
139                if last_stage.source and len(last_stage.source) > 0:
140                    # If it has any sources, add a connection from each of that stage's sources
141                    # to this stage
142                    for source in last_stage.source:
143                        elements.append(
144                            DiagramElement(
145                                element.identifier,
146                                element.description,
147                                _("Policy denied"),
148                                [source],
149                                style=element.style,
150                            )
151                        )
152
153        if len(stages) > 0:
154            elements.append(
155                DiagramElement(
156                    "done",
157                    _("End of the flow"),
158                    "",
159                    [stages[-1]],
160                    style=["[[", "]]"],
161                ),
162            )
163        return stages + elements

Collect all stages

def get_flow_auth_requirement(self) -> list[DiagramElement]:
165    def get_flow_auth_requirement(self) -> list[DiagramElement]:
166        """Get flow authentication requirement"""
167        end_el = DiagramElement(
168            "done",
169            _("End of the flow"),
170            _("Requirement not fulfilled"),
171            style=["[[", "]]"],
172        )
173        elements = []
174        if self.flow.authentication == FlowAuthenticationRequirement.NONE:
175            return []
176        auth = DiagramElement(
177            "flow_auth_requirement",
178            _("Flow authentication requirement") + "\n" + self.flow.authentication,
179        )
180        elements.append(auth)
181        end_el.source = [auth]
182        elements.append(end_el)
183        elements.append(
184            DiagramElement("flow_start", "placeholder", _("Requirement fulfilled"), source=[auth])
185        )
186        return elements

Get flow authentication requirement

def build(self) -> str:
188    def build(self) -> str:
189        """Build flowchart"""
190        all_elements = [
191            "graph TD",
192        ]
193
194        all_elements.extend(self.get_flow_auth_requirement())
195
196        pre_flow_policies_element = DiagramElement(
197            "flow_pre", _("Pre-flow policies"), style=["[[", "]]"]
198        )
199        flow_policies = self.get_flow_policies([pre_flow_policies_element])
200        if len(flow_policies) > 0:
201            all_elements.append(pre_flow_policies_element)
202            all_elements.extend(flow_policies)
203            all_elements.append(
204                DiagramElement(
205                    "done",
206                    _("End of the flow"),
207                    _("Policy denied"),
208                    flow_policies,
209                    style=["[[", "]]"],
210                )
211            )
212
213        flow_element = DiagramElement(
214            "flow_start",
215            _("Flow") + "\n" + self.flow.name,
216            "" if len(flow_policies) > 0 else None,
217            source=flow_policies,
218            style=["[[", "]]"],
219        )
220        all_elements.append(flow_element)
221
222        stages = self.get_stages([flow_element])
223        all_elements.extend(stages)
224        if len(stages) < 1:
225            all_elements.append(
226                DiagramElement(
227                    "done",
228                    _("End of the flow"),
229                    "",
230                    [flow_element],
231                    style=["[[", "]]"],
232                ),
233            )
234        return "\n".join([str(x) for x in all_elements])

Build flowchart