authentik.flows.api.flows_diagram
Flows Diagram API
1"""Flows Diagram API""" 2 3from dataclasses import dataclass, field 4 5from django.utils.translation import gettext as _ 6from guardian.shortcuts import get_objects_for_user 7from rest_framework.serializers import CharField 8 9from authentik.core.api.utils import PassiveSerializer 10from authentik.core.models import User 11from authentik.flows.models import Flow, FlowAuthenticationRequirement, FlowStageBinding 12 13 14@dataclass 15class DiagramElement: 16 """Single element used in a diagram""" 17 18 identifier: str 19 description: str 20 action: str | None = None 21 source: list[DiagramElement] | None = None 22 23 style: list[str] = field(default_factory=lambda: ["[", "]"]) 24 25 def __str__(self) -> str: 26 description = self.description.replace('"', "#quot;") 27 element = f'{self.identifier}{self.style[0]}"{description}"{self.style[1]}' 28 if self.action is not None: 29 if self.action != "": 30 element = f"--{self.action}--> {element}" 31 else: 32 element = f"--> {element}" 33 if self.source: 34 source_element = [] 35 for source in self.source: 36 source_element.append(f"{source.identifier} {element}") 37 return "\n".join(source_element) 38 return element 39 40 41class FlowDiagramSerializer(PassiveSerializer): 42 """response of the flow's diagram action""" 43 44 diagram = CharField(read_only=True) 45 46 47class FlowDiagram: 48 """Generate flow chart fow a flow""" 49 50 flow: Flow 51 user: User 52 53 def __init__(self, flow: Flow, user: User) -> None: 54 self.flow = flow 55 self.user = user 56 57 def get_flow_policies(self, parent_elements: list[DiagramElement]) -> list[DiagramElement]: 58 """Collect all policies bound to the flow""" 59 elements = [] 60 for p_index, policy_binding in enumerate( 61 get_objects_for_user(self.user, "authentik_policies.view_policybinding") 62 .filter(target=self.flow) 63 .exclude(policy__isnull=True) 64 .order_by("order") 65 ): 66 element = DiagramElement( 67 f"flow_policy_{p_index}", 68 _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name})) 69 + "\n" 70 + policy_binding.policy.name, 71 _("Binding {order}".format_map({"order": policy_binding.order})), 72 parent_elements, 73 style=["{{", "}}"], 74 ) 75 elements.append(element) 76 return elements 77 78 def get_stage_policies( 79 self, 80 stage_index: int, 81 stage_binding: FlowStageBinding, 82 parent_elements: list[DiagramElement], 83 ) -> list[DiagramElement]: 84 """First all policies bound to stages since they execute before stages""" 85 elements = [] 86 for p_index, policy_binding in enumerate( 87 get_objects_for_user(self.user, "authentik_policies.view_policybinding") 88 .filter(target=stage_binding) 89 .exclude(policy__isnull=True) 90 .order_by("order") 91 ): 92 element = DiagramElement( 93 f"stage_{stage_index}_policy_{p_index}", 94 _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name})) 95 + "\n" 96 + policy_binding.policy.name, 97 "", 98 parent_elements, 99 style=["{{", "}}"], 100 ) 101 elements.append(element) 102 return elements 103 104 def get_stages(self, parent_elements: list[DiagramElement]) -> list[str | DiagramElement]: 105 """Collect all stages""" 106 elements = [] 107 stages = [] 108 for s_index, stage_binding in enumerate( 109 get_objects_for_user(self.user, "authentik_flows.view_flowstagebinding") 110 .filter(target=self.flow) 111 .order_by("order") 112 ): 113 stage_policies = self.get_stage_policies(s_index, stage_binding, parent_elements) 114 elements.extend(stage_policies) 115 116 action = "" 117 if len(stage_policies) > 0: 118 action = _("Policy passed") 119 120 element = DiagramElement( 121 f"stage_{s_index}", 122 _("Stage ({type})".format_map({"type": stage_binding.stage._meta.verbose_name})) 123 + "\n" 124 + stage_binding.stage.name, 125 action, 126 stage_policies, 127 style=["([", "])"], 128 ) 129 stages.append(element) 130 131 parent_elements = [element] 132 133 # This adds connections for policy denies, but retroactively, as we can't really 134 # look ahead 135 # Check if we have a stage behind us and if it has any sources 136 if s_index > 0: 137 last_stage: DiagramElement = stages[s_index - 1] 138 if last_stage.source and len(last_stage.source) > 0: 139 # If it has any sources, add a connection from each of that stage's sources 140 # to this stage 141 for source in last_stage.source: 142 elements.append( 143 DiagramElement( 144 element.identifier, 145 element.description, 146 _("Policy denied"), 147 [source], 148 style=element.style, 149 ) 150 ) 151 152 if len(stages) > 0: 153 elements.append( 154 DiagramElement( 155 "done", 156 _("End of the flow"), 157 "", 158 [stages[-1]], 159 style=["[[", "]]"], 160 ), 161 ) 162 return stages + elements 163 164 def get_flow_auth_requirement(self) -> list[DiagramElement]: 165 """Get flow authentication requirement""" 166 end_el = DiagramElement( 167 "done", 168 _("End of the flow"), 169 _("Requirement not fulfilled"), 170 style=["[[", "]]"], 171 ) 172 elements = [] 173 if self.flow.authentication == FlowAuthenticationRequirement.NONE: 174 return [] 175 auth = DiagramElement( 176 "flow_auth_requirement", 177 _("Flow authentication requirement") + "\n" + self.flow.authentication, 178 ) 179 elements.append(auth) 180 end_el.source = [auth] 181 elements.append(end_el) 182 elements.append( 183 DiagramElement("flow_start", "placeholder", _("Requirement fulfilled"), source=[auth]) 184 ) 185 return elements 186 187 def build(self) -> str: 188 """Build flowchart""" 189 all_elements = [ 190 "graph TD", 191 ] 192 193 all_elements.extend(self.get_flow_auth_requirement()) 194 195 pre_flow_policies_element = DiagramElement( 196 "flow_pre", _("Pre-flow policies"), style=["[[", "]]"] 197 ) 198 flow_policies = self.get_flow_policies([pre_flow_policies_element]) 199 if len(flow_policies) > 0: 200 all_elements.append(pre_flow_policies_element) 201 all_elements.extend(flow_policies) 202 all_elements.append( 203 DiagramElement( 204 "done", 205 _("End of the flow"), 206 _("Policy denied"), 207 flow_policies, 208 style=["[[", "]]"], 209 ) 210 ) 211 212 flow_element = DiagramElement( 213 "flow_start", 214 _("Flow") + "\n" + self.flow.name, 215 "" if len(flow_policies) > 0 else None, 216 source=flow_policies, 217 style=["[[", "]]"], 218 ) 219 all_elements.append(flow_element) 220 221 stages = self.get_stages([flow_element]) 222 all_elements.extend(stages) 223 if len(stages) < 1: 224 all_elements.append( 225 DiagramElement( 226 "done", 227 _("End of the flow"), 228 "", 229 [flow_element], 230 style=["[[", "]]"], 231 ), 232 ) 233 return "\n".join([str(x) for x in all_elements])
@dataclass
class
DiagramElement:
15@dataclass 16class DiagramElement: 17 """Single element used in a diagram""" 18 19 identifier: str 20 description: str 21 action: str | None = None 22 source: list[DiagramElement] | None = None 23 24 style: list[str] = field(default_factory=lambda: ["[", "]"]) 25 26 def __str__(self) -> str: 27 description = self.description.replace('"', "#quot;") 28 element = f'{self.identifier}{self.style[0]}"{description}"{self.style[1]}' 29 if self.action is not None: 30 if self.action != "": 31 element = f"--{self.action}--> {element}" 32 else: 33 element = f"--> {element}" 34 if self.source: 35 source_element = [] 36 for source in self.source: 37 source_element.append(f"{source.identifier} {element}") 38 return "\n".join(source_element) 39 return element
Single element used in a diagram
DiagramElement( identifier: str, description: str, action: str | None = None, source: list[DiagramElement] | None = None, style: list[str] = <factory>)
42class FlowDiagramSerializer(PassiveSerializer): 43 """response of the flow's diagram action""" 44 45 diagram = CharField(read_only=True)
response of the flow's diagram action
Inherited Members
class
FlowDiagram:
48class FlowDiagram: 49 """Generate flow chart fow a flow""" 50 51 flow: Flow 52 user: User 53 54 def __init__(self, flow: Flow, user: User) -> None: 55 self.flow = flow 56 self.user = user 57 58 def get_flow_policies(self, parent_elements: list[DiagramElement]) -> list[DiagramElement]: 59 """Collect all policies bound to the flow""" 60 elements = [] 61 for p_index, policy_binding in enumerate( 62 get_objects_for_user(self.user, "authentik_policies.view_policybinding") 63 .filter(target=self.flow) 64 .exclude(policy__isnull=True) 65 .order_by("order") 66 ): 67 element = DiagramElement( 68 f"flow_policy_{p_index}", 69 _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name})) 70 + "\n" 71 + policy_binding.policy.name, 72 _("Binding {order}".format_map({"order": policy_binding.order})), 73 parent_elements, 74 style=["{{", "}}"], 75 ) 76 elements.append(element) 77 return elements 78 79 def get_stage_policies( 80 self, 81 stage_index: int, 82 stage_binding: FlowStageBinding, 83 parent_elements: list[DiagramElement], 84 ) -> list[DiagramElement]: 85 """First all policies bound to stages since they execute before stages""" 86 elements = [] 87 for p_index, policy_binding in enumerate( 88 get_objects_for_user(self.user, "authentik_policies.view_policybinding") 89 .filter(target=stage_binding) 90 .exclude(policy__isnull=True) 91 .order_by("order") 92 ): 93 element = DiagramElement( 94 f"stage_{stage_index}_policy_{p_index}", 95 _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name})) 96 + "\n" 97 + policy_binding.policy.name, 98 "", 99 parent_elements, 100 style=["{{", "}}"], 101 ) 102 elements.append(element) 103 return elements 104 105 def get_stages(self, parent_elements: list[DiagramElement]) -> list[str | DiagramElement]: 106 """Collect all stages""" 107 elements = [] 108 stages = [] 109 for s_index, stage_binding in enumerate( 110 get_objects_for_user(self.user, "authentik_flows.view_flowstagebinding") 111 .filter(target=self.flow) 112 .order_by("order") 113 ): 114 stage_policies = self.get_stage_policies(s_index, stage_binding, parent_elements) 115 elements.extend(stage_policies) 116 117 action = "" 118 if len(stage_policies) > 0: 119 action = _("Policy passed") 120 121 element = DiagramElement( 122 f"stage_{s_index}", 123 _("Stage ({type})".format_map({"type": stage_binding.stage._meta.verbose_name})) 124 + "\n" 125 + stage_binding.stage.name, 126 action, 127 stage_policies, 128 style=["([", "])"], 129 ) 130 stages.append(element) 131 132 parent_elements = [element] 133 134 # This adds connections for policy denies, but retroactively, as we can't really 135 # look ahead 136 # Check if we have a stage behind us and if it has any sources 137 if s_index > 0: 138 last_stage: DiagramElement = stages[s_index - 1] 139 if last_stage.source and len(last_stage.source) > 0: 140 # If it has any sources, add a connection from each of that stage's sources 141 # to this stage 142 for source in last_stage.source: 143 elements.append( 144 DiagramElement( 145 element.identifier, 146 element.description, 147 _("Policy denied"), 148 [source], 149 style=element.style, 150 ) 151 ) 152 153 if len(stages) > 0: 154 elements.append( 155 DiagramElement( 156 "done", 157 _("End of the flow"), 158 "", 159 [stages[-1]], 160 style=["[[", "]]"], 161 ), 162 ) 163 return stages + elements 164 165 def get_flow_auth_requirement(self) -> list[DiagramElement]: 166 """Get flow authentication requirement""" 167 end_el = DiagramElement( 168 "done", 169 _("End of the flow"), 170 _("Requirement not fulfilled"), 171 style=["[[", "]]"], 172 ) 173 elements = [] 174 if self.flow.authentication == FlowAuthenticationRequirement.NONE: 175 return [] 176 auth = DiagramElement( 177 "flow_auth_requirement", 178 _("Flow authentication requirement") + "\n" + self.flow.authentication, 179 ) 180 elements.append(auth) 181 end_el.source = [auth] 182 elements.append(end_el) 183 elements.append( 184 DiagramElement("flow_start", "placeholder", _("Requirement fulfilled"), source=[auth]) 185 ) 186 return elements 187 188 def build(self) -> str: 189 """Build flowchart""" 190 all_elements = [ 191 "graph TD", 192 ] 193 194 all_elements.extend(self.get_flow_auth_requirement()) 195 196 pre_flow_policies_element = DiagramElement( 197 "flow_pre", _("Pre-flow policies"), style=["[[", "]]"] 198 ) 199 flow_policies = self.get_flow_policies([pre_flow_policies_element]) 200 if len(flow_policies) > 0: 201 all_elements.append(pre_flow_policies_element) 202 all_elements.extend(flow_policies) 203 all_elements.append( 204 DiagramElement( 205 "done", 206 _("End of the flow"), 207 _("Policy denied"), 208 flow_policies, 209 style=["[[", "]]"], 210 ) 211 ) 212 213 flow_element = DiagramElement( 214 "flow_start", 215 _("Flow") + "\n" + self.flow.name, 216 "" if len(flow_policies) > 0 else None, 217 source=flow_policies, 218 style=["[[", "]]"], 219 ) 220 all_elements.append(flow_element) 221 222 stages = self.get_stages([flow_element]) 223 all_elements.extend(stages) 224 if len(stages) < 1: 225 all_elements.append( 226 DiagramElement( 227 "done", 228 _("End of the flow"), 229 "", 230 [flow_element], 231 style=["[[", "]]"], 232 ), 233 ) 234 return "\n".join([str(x) for x in all_elements])
Generate flow chart fow a flow
FlowDiagram(flow: authentik.flows.models.Flow, user: authentik.core.models.User)
58 def get_flow_policies(self, parent_elements: list[DiagramElement]) -> list[DiagramElement]: 59 """Collect all policies bound to the flow""" 60 elements = [] 61 for p_index, policy_binding in enumerate( 62 get_objects_for_user(self.user, "authentik_policies.view_policybinding") 63 .filter(target=self.flow) 64 .exclude(policy__isnull=True) 65 .order_by("order") 66 ): 67 element = DiagramElement( 68 f"flow_policy_{p_index}", 69 _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name})) 70 + "\n" 71 + policy_binding.policy.name, 72 _("Binding {order}".format_map({"order": policy_binding.order})), 73 parent_elements, 74 style=["{{", "}}"], 75 ) 76 elements.append(element) 77 return elements
Collect all policies bound to the flow
def
get_stage_policies( self, stage_index: int, stage_binding: authentik.flows.models.FlowStageBinding, parent_elements: list[DiagramElement]) -> list[DiagramElement]:
79 def get_stage_policies( 80 self, 81 stage_index: int, 82 stage_binding: FlowStageBinding, 83 parent_elements: list[DiagramElement], 84 ) -> list[DiagramElement]: 85 """First all policies bound to stages since they execute before stages""" 86 elements = [] 87 for p_index, policy_binding in enumerate( 88 get_objects_for_user(self.user, "authentik_policies.view_policybinding") 89 .filter(target=stage_binding) 90 .exclude(policy__isnull=True) 91 .order_by("order") 92 ): 93 element = DiagramElement( 94 f"stage_{stage_index}_policy_{p_index}", 95 _("Policy ({type})".format_map({"type": policy_binding.policy._meta.verbose_name})) 96 + "\n" 97 + policy_binding.policy.name, 98 "", 99 parent_elements, 100 style=["{{", "}}"], 101 ) 102 elements.append(element) 103 return elements
First all policies bound to stages since they execute before stages
105 def get_stages(self, parent_elements: list[DiagramElement]) -> list[str | DiagramElement]: 106 """Collect all stages""" 107 elements = [] 108 stages = [] 109 for s_index, stage_binding in enumerate( 110 get_objects_for_user(self.user, "authentik_flows.view_flowstagebinding") 111 .filter(target=self.flow) 112 .order_by("order") 113 ): 114 stage_policies = self.get_stage_policies(s_index, stage_binding, parent_elements) 115 elements.extend(stage_policies) 116 117 action = "" 118 if len(stage_policies) > 0: 119 action = _("Policy passed") 120 121 element = DiagramElement( 122 f"stage_{s_index}", 123 _("Stage ({type})".format_map({"type": stage_binding.stage._meta.verbose_name})) 124 + "\n" 125 + stage_binding.stage.name, 126 action, 127 stage_policies, 128 style=["([", "])"], 129 ) 130 stages.append(element) 131 132 parent_elements = [element] 133 134 # This adds connections for policy denies, but retroactively, as we can't really 135 # look ahead 136 # Check if we have a stage behind us and if it has any sources 137 if s_index > 0: 138 last_stage: DiagramElement = stages[s_index - 1] 139 if last_stage.source and len(last_stage.source) > 0: 140 # If it has any sources, add a connection from each of that stage's sources 141 # to this stage 142 for source in last_stage.source: 143 elements.append( 144 DiagramElement( 145 element.identifier, 146 element.description, 147 _("Policy denied"), 148 [source], 149 style=element.style, 150 ) 151 ) 152 153 if len(stages) > 0: 154 elements.append( 155 DiagramElement( 156 "done", 157 _("End of the flow"), 158 "", 159 [stages[-1]], 160 style=["[[", "]]"], 161 ), 162 ) 163 return stages + elements
Collect all stages
165 def get_flow_auth_requirement(self) -> list[DiagramElement]: 166 """Get flow authentication requirement""" 167 end_el = DiagramElement( 168 "done", 169 _("End of the flow"), 170 _("Requirement not fulfilled"), 171 style=["[[", "]]"], 172 ) 173 elements = [] 174 if self.flow.authentication == FlowAuthenticationRequirement.NONE: 175 return [] 176 auth = DiagramElement( 177 "flow_auth_requirement", 178 _("Flow authentication requirement") + "\n" + self.flow.authentication, 179 ) 180 elements.append(auth) 181 end_el.source = [auth] 182 elements.append(end_el) 183 elements.append( 184 DiagramElement("flow_start", "placeholder", _("Requirement fulfilled"), source=[auth]) 185 ) 186 return elements
Get flow authentication requirement
def
build(self) -> str:
188 def build(self) -> str: 189 """Build flowchart""" 190 all_elements = [ 191 "graph TD", 192 ] 193 194 all_elements.extend(self.get_flow_auth_requirement()) 195 196 pre_flow_policies_element = DiagramElement( 197 "flow_pre", _("Pre-flow policies"), style=["[[", "]]"] 198 ) 199 flow_policies = self.get_flow_policies([pre_flow_policies_element]) 200 if len(flow_policies) > 0: 201 all_elements.append(pre_flow_policies_element) 202 all_elements.extend(flow_policies) 203 all_elements.append( 204 DiagramElement( 205 "done", 206 _("End of the flow"), 207 _("Policy denied"), 208 flow_policies, 209 style=["[[", "]]"], 210 ) 211 ) 212 213 flow_element = DiagramElement( 214 "flow_start", 215 _("Flow") + "\n" + self.flow.name, 216 "" if len(flow_policies) > 0 else None, 217 source=flow_policies, 218 style=["[[", "]]"], 219 ) 220 all_elements.append(flow_element) 221 222 stages = self.get_stages([flow_element]) 223 all_elements.extend(stages) 224 if len(stages) < 1: 225 all_elements.append( 226 DiagramElement( 227 "done", 228 _("End of the flow"), 229 "", 230 [flow_element], 231 style=["[[", "]]"], 232 ), 233 ) 234 return "\n".join([str(x) for x in all_elements])
Build flowchart