diff --git a/authentik/providers/scim/clients/schema.py b/authentik/providers/scim/clients/schema.py index b9131f2470..fafe49197f 100644 --- a/authentik/providers/scim/clients/schema.py +++ b/authentik/providers/scim/clients/schema.py @@ -2,7 +2,7 @@ from enum import Enum -from pydantic import Field +from pydantic import AnyUrl, BaseModel, ConfigDict, Field from pydanticscim.group import Group as BaseGroup from pydanticscim.responses import PatchOperation as BasePatchOperation from pydanticscim.responses import PatchRequest as BasePatchRequest @@ -12,19 +12,95 @@ from pydanticscim.service_provider import ChangePassword, Filter, Patch, Sort from pydanticscim.service_provider import ( ServiceProviderConfiguration as BaseServiceProviderConfiguration, ) +from pydanticscim.user import AddressKind from pydanticscim.user import User as BaseUser SCIM_USER_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:User" SCIM_GROUP_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:Group" +class Address(BaseModel): + formatted: str | None = Field( + None, + description="The full mailing address, formatted for display " + "or use with a mailing label. This attribute MAY contain newlines.", + ) + streetAddress: str | None = Field( + None, + description="The full street address component, which may " + "include house number, street name, P.O. box, and multi-line " + "extended street address information. This attribute MAY contain newlines.", + ) + locality: str | None = Field(None, description="The city or locality component.") + region: str | None = Field(None, description="The state or region component.") + postalCode: str | None = Field(None, description="The zip code or postal code component.") + country: str | None = Field(None, description="The country name component.") + type: AddressKind | None = Field( + None, + description="A label indicating the attribute's function, e.g., 'work' or 'home'.", + ) + primary: bool | None = None + + +class Manager(BaseModel): + value: str | None = Field( + None, + description="The id of the SCIM resource representingthe User's manager. REQUIRED.", + ) + ref: AnyUrl | None = Field( + None, + alias="$ref", + description="The URI of the SCIM resource representing the User's manager. REQUIRED.", + ) + displayName: str | None = Field( + None, + description="The displayName of the User's manager. OPTIONAL and READ-ONLY.", + ) + + +class EnterpriseUser(BaseModel): + employeeNumber: str | None = Field( + None, + description="Numeric or alphanumeric identifier assigned to a person, " + "typically based on order of hire or association with anorganization.", + ) + costCenter: str | None = Field(None, description="Identifies the name of a cost center.") + organization: str | None = Field(None, description="Identifies the name of an organization.") + division: str | None = Field(None, description="Identifies the name of a division.") + department: str | None = Field( + None, + description="Numeric or alphanumeric identifier assigned to a person," + " typically based on order of hire or association with anorganization.", + ) + manager: Manager | None = Field( + None, + description="The User's manager. A complex type that optionally allows " + "service providers to represent organizational hierarchy by referencing" + " the 'id' attribute of another User.", + ) + + class User(BaseUser): """Modified User schema with added externalId field""" + model_config = ConfigDict(serialize_by_alias=True) + id: str | int | None = None schemas: list[str] = [SCIM_USER_SCHEMA] externalId: str | None = None meta: dict | None = None + addresses: list[Address] | None = Field( + None, + description=( + "A physical mailing address for this User. Canonical type " + "values of 'work', 'home', and 'other'." + ), + ) + enterprise_user: EnterpriseUser | None = Field( + default=None, + alias="urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + serialization_alias="urn:ietf:params:scim:schemas:extension:enterprise:2.0:User", + ) class Group(BaseGroup): @@ -92,7 +168,7 @@ class PatchOperation(BasePatchOperation): """PatchOperation with optional path""" op: PatchOp - path: str | None + path: str | None = None class SCIMError(BaseSCIMError): diff --git a/authentik/sources/scim/api/groups.py b/authentik/sources/scim/api/groups.py index 8d7a6ccfe3..1fc41ef76b 100644 --- a/authentik/sources/scim/api/groups.py +++ b/authentik/sources/scim/api/groups.py @@ -18,6 +18,7 @@ class SCIMSourceGroupSerializer(SourceSerializer): model = SCIMSourceGroup fields = [ "id", + "external_id", "group", "group_obj", "source", @@ -31,5 +32,5 @@ class SCIMSourceGroupViewSet(UsedByMixin, ModelViewSet): queryset = SCIMSourceGroup.objects.all().select_related("group") serializer_class = SCIMSourceGroupSerializer filterset_fields = ["source__slug", "group__name", "group__group_uuid"] - search_fields = ["source__slug", "group__name", "attributes"] + search_fields = ["source__slug", "group__name", "attributes", "external_id"] ordering = ["group__name"] diff --git a/authentik/sources/scim/api/users.py b/authentik/sources/scim/api/users.py index e50af1786a..b2425aa2a7 100644 --- a/authentik/sources/scim/api/users.py +++ b/authentik/sources/scim/api/users.py @@ -18,6 +18,7 @@ class SCIMSourceUserSerializer(SourceSerializer): model = SCIMSourceUser fields = [ "id", + "external_id", "user", "user_obj", "source", @@ -31,5 +32,5 @@ class SCIMSourceUserViewSet(UsedByMixin, ModelViewSet): queryset = SCIMSourceUser.objects.all().select_related("user") serializer_class = SCIMSourceUserSerializer filterset_fields = ["source__slug", "user__username", "user__id"] - search_fields = ["source__slug", "user__username", "attributes"] + search_fields = ["source__slug", "user__username", "attributes", "user__uuid", "external_id"] ordering = ["user__username"] diff --git a/authentik/sources/scim/constants.py b/authentik/sources/scim/constants.py new file mode 100644 index 0000000000..9acb405926 --- /dev/null +++ b/authentik/sources/scim/constants.py @@ -0,0 +1,4 @@ +SCIM_URN_SCHEMA = "urn:ietf:params:scim:schemas:core:2.0:Schema" +SCIM_URN_GROUP = "urn:ietf:params:scim:schemas:core:2.0:Group" +SCIM_URN_USER = "urn:ietf:params:scim:schemas:core:2.0:User" +SCIM_URN_USER_ENTERPRISE = "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" diff --git a/authentik/sources/scim/errors.py b/authentik/sources/scim/errors.py deleted file mode 100644 index 76c3f3a017..0000000000 --- a/authentik/sources/scim/errors.py +++ /dev/null @@ -1,8 +0,0 @@ -"""SCIM Errors""" - -from authentik.lib.sentry import SentryIgnoredException - - -class PatchError(SentryIgnoredException): - """Error raised within an atomic block when an error happened - so nothing is saved""" diff --git a/authentik/sources/scim/migrations/0003_alter_scimsourcegroup_unique_together_and_more.py b/authentik/sources/scim/migrations/0003_alter_scimsourcegroup_unique_together_and_more.py new file mode 100644 index 0000000000..c31fb67618 --- /dev/null +++ b/authentik/sources/scim/migrations/0003_alter_scimsourcegroup_unique_together_and_more.py @@ -0,0 +1,98 @@ +# Generated by Django 5.1.11 on 2025-07-13 01:07 + +import uuid +from django.db import migrations, models +from django.apps.registry import Apps + +from django.db.backends.base.schema import BaseDatabaseSchemaEditor + + +def migrate_ext_id(apps: Apps, schema_editor: BaseDatabaseSchemaEditor): + SCIMSourceUser = apps.get_model("authentik_sources_scim", "SCIMSourceUser") + SCIMSourceGroup = apps.get_model("authentik_sources_scim", "SCIMSourceGroup") + db_alias = schema_editor.connection.alias + for user in SCIMSourceUser.objects.using(db_alias).all(): + user.external_id = user.id + user.save(update_fields=["external_id"]) + for group in SCIMSourceGroup.objects.using(db_alias).all(): + group.external_id = group.id + group.save(update_fields=["external_id"]) + + +class Migration(migrations.Migration): + + dependencies = [ + ("authentik_sources_scim", "0002_scimsourcepropertymapping"), + ] + + operations = [ + migrations.AlterUniqueTogether( + name="scimsourcegroup", + unique_together=set(), + ), + migrations.AlterUniqueTogether( + name="scimsourceuser", + unique_together=set(), + ), + migrations.AddField( + model_name="scimsourcegroup", + name="external_id", + field=models.TextField(default=None, null=True), + preserve_default=False, + ), + migrations.AddField( + model_name="scimsourceuser", + name="external_id", + field=models.TextField(default=None, null=True), + preserve_default=False, + ), + migrations.AlterUniqueTogether( + name="scimsourcegroup", + unique_together={("external_id", "source")}, + ), + migrations.AlterUniqueTogether( + name="scimsourceuser", + unique_together={("external_id", "source")}, + ), + migrations.RunPython(migrate_ext_id, migrations.RunPython.noop), + migrations.AlterField( + model_name="scimsourcegroup", + name="external_id", + field=models.TextField(), + preserve_default=False, + ), + migrations.AlterField( + model_name="scimsourceuser", + name="external_id", + field=models.TextField(), + preserve_default=False, + ), + migrations.AddIndex( + model_name="scimsourcegroup", + index=models.Index(fields=["external_id"], name="authentik_s_externa_05e346_idx"), + ), + migrations.AddIndex( + model_name="scimsourceuser", + index=models.Index(fields=["external_id"], name="authentik_s_externa_4bd760_idx"), + ), + migrations.AlterField( + model_name="scimsourcegroup", + name="id", + field=models.TextField(default=uuid.uuid4, primary_key=True, serialize=False), + ), + migrations.AlterField( + model_name="scimsourceuser", + name="id", + field=models.TextField(default=uuid.uuid4, primary_key=True, serialize=False), + ), + migrations.AddField( + model_name="scimsourcegroup", + name="last_update", + field=models.DateTimeField(auto_now=True), + ), + migrations.AddField( + model_name="scimsourceuser", + name="last_update", + field=models.DateTimeField(auto_now=True), + ), + ] diff --git a/authentik/sources/scim/models.py b/authentik/sources/scim/models.py index dc05c57aea..5f2d147c90 100644 --- a/authentik/sources/scim/models.py +++ b/authentik/sources/scim/models.py @@ -1,6 +1,7 @@ """SCIM Source""" from typing import Any +from uuid import uuid4 from django.db import models from django.templatetags.static import static @@ -103,10 +104,12 @@ class SCIMSourcePropertyMapping(PropertyMapping): class SCIMSourceUser(SerializerModel): """Mapping of a user and source to a SCIM user ID""" - id = models.TextField(primary_key=True) + id = models.TextField(primary_key=True, default=uuid4) + external_id = models.TextField() user = models.ForeignKey(User, on_delete=models.CASCADE) source = models.ForeignKey(SCIMSource, on_delete=models.CASCADE) attributes = models.JSONField(default=dict) + last_update = models.DateTimeField(auto_now=True) @property def serializer(self) -> BaseSerializer: @@ -115,7 +118,10 @@ class SCIMSourceUser(SerializerModel): return SCIMSourceUserSerializer class Meta: - unique_together = (("id", "user", "source"),) + unique_together = (("external_id", "source"),) + indexes = [ + models.Index(fields=["external_id"]), + ] def __str__(self) -> str: return f"SCIM User {self.user_id} to {self.source_id}" @@ -124,10 +130,12 @@ class SCIMSourceUser(SerializerModel): class SCIMSourceGroup(SerializerModel): """Mapping of a group and source to a SCIM user ID""" - id = models.TextField(primary_key=True) + id = models.TextField(primary_key=True, default=uuid4) + external_id = models.TextField() group = models.ForeignKey(Group, on_delete=models.CASCADE) source = models.ForeignKey(SCIMSource, on_delete=models.CASCADE) attributes = models.JSONField(default=dict) + last_update = models.DateTimeField(auto_now=True) @property def serializer(self) -> BaseSerializer: @@ -136,7 +144,10 @@ class SCIMSourceGroup(SerializerModel): return SCIMSourceGroupSerializer class Meta: - unique_together = (("id", "group", "source"),) + unique_together = (("external_id", "source"),) + indexes = [ + models.Index(fields=["external_id"]), + ] def __str__(self) -> str: return f"SCIM Group {self.group_id} to {self.source_id}" diff --git a/authentik/sources/scim/patch/__init__.py b/authentik/sources/scim/patch/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/authentik/sources/scim/patch/lexer.py b/authentik/sources/scim/patch/lexer.py new file mode 100644 index 0000000000..81cb1a5617 --- /dev/null +++ b/authentik/sources/scim/patch/lexer.py @@ -0,0 +1,180 @@ +from dataclasses import dataclass +from enum import Enum + +from authentik.sources.scim.constants import ( + SCIM_URN_GROUP, + SCIM_URN_SCHEMA, + SCIM_URN_USER, + SCIM_URN_USER_ENTERPRISE, +) + + +# Token types for SCIM path parsing +class TokenType(Enum): + ATTRIBUTE = "ATTRIBUTE" + DOT = "DOT" + LBRACKET = "LBRACKET" + RBRACKET = "RBRACKET" + LPAREN = "LPAREN" + RPAREN = "RPAREN" + STRING = "STRING" + NUMBER = "NUMBER" + BOOLEAN = "BOOLEAN" + NULL = "NULL" + OPERATOR = "OPERATOR" + AND = "AND" + OR = "OR" + NOT = "NOT" + EOF = "EOF" + + +@dataclass +class Token: + type: TokenType + value: str + position: int = 0 + + +class SCIMPathLexer: + """Lexer for SCIM paths and filter expressions""" + + OPERATORS = ["eq", "ne", "co", "sw", "ew", "gt", "lt", "ge", "le", "pr"] + + def __init__(self, text: str): + self.schema_urns = [ + SCIM_URN_SCHEMA, + SCIM_URN_GROUP, + SCIM_URN_USER, + SCIM_URN_USER_ENTERPRISE, + ] + self.text = text + self.pos = 0 + self.current_char = self.text[self.pos] if self.pos < len(self.text) else None + + def advance(self): + """Move to next character""" + self.pos += 1 + self.current_char = self.text[self.pos] if self.pos < len(self.text) else None + + def skip_whitespace(self): + """Skip whitespace characters""" + while self.current_char and self.current_char.isspace(): + self.advance() + + def read_string(self, quote_char): + """Read a quoted string""" + value = "" + self.advance() # Skip opening quote + + while self.current_char and self.current_char != quote_char: + if self.current_char == "\\": + self.advance() + if self.current_char: + value += self.current_char + self.advance() + else: + value += self.current_char + self.advance() + + if self.current_char == quote_char: + self.advance() # Skip closing quote + + return value + + def read_number(self): + """Read a number (integer or float)""" + value = "" + while self.current_char and (self.current_char.isdigit() or self.current_char == "."): + value += self.current_char + self.advance() + return value + + def read_identifier(self): + """Read an identifier (attribute name or operator) - supports URN format""" + value = "" + while self.current_char and (self.current_char.isalnum() or self.current_char in "_-:"): + value += self.current_char + self.advance() + # If the identifier value so far is a schema URN, take that as the identifier and + # treat the next part as a sub_attribute + if value in self.schema_urns: + self.current_char = "." + return value + + # Handle dots within URN identifiers (like "2.0") + # A dot is part of the identifier if it's followed by a digit + if ( + self.current_char == "." + and self.pos + 1 < len(self.text) + and self.text[self.pos + 1].isdigit() + ): + value += self.current_char + self.advance() + # Continue reading digits after the dot + while self.current_char and self.current_char.isdigit(): + value += self.current_char + self.advance() + + return value + + def get_next_token(self) -> Token: # noqa PLR0911 + """Get the next token from the input""" + while self.current_char: + if self.current_char.isspace(): + self.skip_whitespace() + continue + + if self.current_char == ".": + self.advance() + return Token(TokenType.DOT, ".") + + if self.current_char == "[": + self.advance() + return Token(TokenType.LBRACKET, "[") + + if self.current_char == "]": + self.advance() + return Token(TokenType.RBRACKET, "]") + + if self.current_char == "(": + self.advance() + return Token(TokenType.LPAREN, "(") + + if self.current_char == ")": + self.advance() + return Token(TokenType.RPAREN, ")") + + if self.current_char in "\"'": + quote_char = self.current_char + value = self.read_string(quote_char) + return Token(TokenType.STRING, value) + + if self.current_char.isdigit(): + value = self.read_number() + return Token(TokenType.NUMBER, value) + + if self.current_char.isalpha() or self.current_char == "_": + value = self.read_identifier() + + # Check for special keywords + if value.lower() == "true": + return Token(TokenType.BOOLEAN, True) + elif value.lower() == "false": + return Token(TokenType.BOOLEAN, False) + elif value.lower() == "null": + return Token(TokenType.NULL, None) + elif value.lower() == "and": + return Token(TokenType.AND, "and") + elif value.lower() == "or": + return Token(TokenType.OR, "or") + elif value.lower() == "not": + return Token(TokenType.NOT, "not") + elif value.lower() in self.OPERATORS: + return Token(TokenType.OPERATOR, value.lower()) + else: + return Token(TokenType.ATTRIBUTE, value) + + # Skip unknown characters + self.advance() + + return Token(TokenType.EOF, "") diff --git a/authentik/sources/scim/patch/parser.py b/authentik/sources/scim/patch/parser.py new file mode 100644 index 0000000000..67abd532a8 --- /dev/null +++ b/authentik/sources/scim/patch/parser.py @@ -0,0 +1,131 @@ +from typing import Any + +from authentik.sources.scim.patch.lexer import SCIMPathLexer, TokenType + + +class SCIMPathParser: + """Parser for SCIM paths including filter expressions""" + + def __init__(self): + self.lexer = None + self.current_token = None + + def parse_path(self, path: str | None) -> list[dict[str, Any]]: + """Parse a SCIM path into components""" + self.lexer = SCIMPathLexer(path) + self.current_token = self.lexer.get_next_token() + + components = [] + + while self.current_token.type != TokenType.EOF: + component = self._parse_path_component() + if component: + components.append(component) + + return components + + def _parse_path_component(self) -> dict[str, Any] | None: + """Parse a single path component""" + if self.current_token.type != TokenType.ATTRIBUTE: + return None + + attribute = self.current_token.value + self._consume(TokenType.ATTRIBUTE) + + filter_expr = None + sub_attribute = None + + # Check for filter expression + if self.current_token.type == TokenType.LBRACKET: + self._consume(TokenType.LBRACKET) + filter_expr = self._parse_filter_expression() + self._consume(TokenType.RBRACKET) + + # Check for sub-attribute + if self.current_token.type == TokenType.DOT: + self._consume(TokenType.DOT) + if self.current_token.type == TokenType.ATTRIBUTE: + sub_attribute = self.current_token.value + self._consume(TokenType.ATTRIBUTE) + + return {"attribute": attribute, "filter": filter_expr, "sub_attribute": sub_attribute} + + def _parse_filter_expression(self) -> dict[str, Any] | None: + """Parse a filter expression like 'primary eq true' or + 'type eq "work" and primary eq true'""" + return self._parse_or_expression() + + def _parse_or_expression(self) -> dict[str, Any] | None: + """Parse OR expressions""" + left = self._parse_and_expression() + + while self.current_token.type == TokenType.OR: + self._consume(TokenType.OR) + right = self._parse_and_expression() + left = {"type": "logical", "operator": "or", "left": left, "right": right} + + return left + + def _parse_and_expression(self) -> dict[str, Any] | None: + """Parse AND expressions""" + left = self._parse_primary_expression() + + while self.current_token.type == TokenType.AND: + self._consume(TokenType.AND) + right = self._parse_primary_expression() + left = {"type": "logical", "operator": "and", "left": left, "right": right} + + return left + + def _parse_primary_expression(self) -> dict[str, Any] | None: + """Parse primary expressions (attribute operator value)""" + if self.current_token.type == TokenType.LPAREN: + self._consume(TokenType.LPAREN) + expr = self._parse_or_expression() + self._consume(TokenType.RPAREN) + return expr + + if self.current_token.type == TokenType.NOT: + self._consume(TokenType.NOT) + expr = self._parse_primary_expression() + return {"type": "logical", "operator": "not", "operand": expr} + + if self.current_token.type != TokenType.ATTRIBUTE: + return None + + attribute = self.current_token.value + self._consume(TokenType.ATTRIBUTE) + + if self.current_token.type != TokenType.OPERATOR: + return None + + operator = self.current_token.value + self._consume(TokenType.OPERATOR) + + # Parse value + value = None + if self.current_token.type == TokenType.STRING: + value = self.current_token.value + self._consume(TokenType.STRING) + elif self.current_token.type == TokenType.NUMBER: + value = ( + float(self.current_token.value) + if "." in self.current_token.value + else int(self.current_token.value) + ) + self._consume(TokenType.NUMBER) + elif self.current_token.type == TokenType.BOOLEAN: + value = self.current_token.value + self._consume(TokenType.BOOLEAN) + elif self.current_token.type == TokenType.NULL: + value = None + self._consume(TokenType.NULL) + + return {"type": "comparison", "attribute": attribute, "operator": operator, "value": value} + + def _consume(self, expected_type: TokenType): + """Consume a token of the expected type""" + if self.current_token.type == expected_type: + self.current_token = self.lexer.get_next_token() + else: + raise ValueError(f"Expected {expected_type}, got {self.current_token.type}") diff --git a/authentik/sources/scim/patch/processor.py b/authentik/sources/scim/patch/processor.py new file mode 100644 index 0000000000..b9cf50233a --- /dev/null +++ b/authentik/sources/scim/patch/processor.py @@ -0,0 +1,246 @@ +from typing import Any + +from authentik.providers.scim.clients.schema import PatchOp, PatchOperation +from authentik.sources.scim.constants import SCIM_URN_USER_ENTERPRISE +from authentik.sources.scim.patch.parser import SCIMPathParser + + +class SCIMPatchProcessor: + """Processes SCIM patch operations on Python dictionaries""" + + def __init__(self): + self.parser = SCIMPathParser() + + def apply_patches(self, data: dict[str, Any], patches: list[PatchOperation]) -> dict[str, Any]: + """Apply a list of patch operations to the data""" + result = data.copy() + + for _patch in patches: + patch = PatchOperation.model_validate(_patch) + if patch.path is None: + # Handle operations with no path - value contains attribute paths as keys + self._apply_bulk_operation(result, patch.op, patch.value) + elif patch.op == PatchOp.add: + self._apply_add(result, patch.path, patch.value) + elif patch.op == PatchOp.remove: + self._apply_remove(result, patch.path) + elif patch.op == PatchOp.replace: + self._apply_replace(result, patch.path, patch.value) + + return result + + def _apply_bulk_operation( + self, data: dict[str, Any], operation: PatchOp, value: dict[str, Any] + ): + """Apply bulk operations when path is None""" + if not isinstance(value, dict): + return + for path, val in value.items(): + if operation == PatchOp.add: + self._apply_add(data, path, val) + elif operation == PatchOp.remove: + self._apply_remove(data, path) + elif operation == PatchOp.replace: + self._apply_replace(data, path, val) + + def _apply_add(self, data: dict[str, Any], path: str, value: Any): + """Apply ADD operation""" + components = self.parser.parse_path(path) + + if len(components) == 1 and not components[0]["filter"]: + # Simple path + attr = components[0]["attribute"] + if components[0]["sub_attribute"]: + if attr not in data: + data[attr] = {} + # Somewhat hacky workaround for the manager attribute of the enterprise schema + # ideally we'd do this based on the schema + if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager": + data[attr][components[0]["sub_attribute"]] = {"value": value} + else: + data[attr][components[0]["sub_attribute"]] = value + elif attr in data: + data[attr].append(value) + else: + data[attr] = value + else: + # Complex path with filters + self._navigate_and_modify(data, components, value, "add") + + def _apply_remove(self, data: dict[str, Any], path: str): + """Apply REMOVE operation""" + components = self.parser.parse_path(path) + + if len(components) == 1 and not components[0]["filter"]: + # Simple path + attr = components[0]["attribute"] + if components[0]["sub_attribute"]: + if attr in data and isinstance(data[attr], dict): + data[attr].pop(components[0]["sub_attribute"], None) + else: + data.pop(attr, None) + else: + # Complex path with filters + self._navigate_and_modify(data, components, None, "remove") + + def _apply_replace(self, data: dict[str, Any], path: str, value: Any): + """Apply REPLACE operation""" + components = self.parser.parse_path(path) + + if len(components) == 1 and not components[0]["filter"]: + # Simple path + attr = components[0]["attribute"] + if components[0]["sub_attribute"]: + if attr not in data: + data[attr] = {} + # Somewhat hacky workaround for the manager attribute of the enterprise schema + # ideally we'd do this based on the schema + if attr == SCIM_URN_USER_ENTERPRISE and components[0]["sub_attribute"] == "manager": + data[attr][components[0]["sub_attribute"]] = {"value": value} + else: + data[attr][components[0]["sub_attribute"]] = value + else: + data[attr] = value + else: + # Complex path with filters + self._navigate_and_modify(data, components, value, "replace") + + def _navigate_and_modify( # noqa PLR0912 + self, data: dict[str, Any], components: list[dict[str, Any]], value: Any, operation: str + ): + """Navigate through complex paths and apply modifications""" + current = data + + for i, component in enumerate(components): + attr = component["attribute"] + filter_expr = component["filter"] + sub_attr = component["sub_attribute"] + + if filter_expr: + # Handle array with filter + if attr not in current: + if operation == "add": + current[attr] = [] + else: + return + + if not isinstance(current[attr], list): + return + + # Find matching items + matching_items = [] + for item in current[attr]: + if self._matches_filter(item, filter_expr): + matching_items.append(item) + + if not matching_items and operation == "add": + # Create new item if none match (only for simple comparison filters) + if filter_expr.get("type", "comparison") == "comparison": + new_item = {filter_expr["attribute"]: filter_expr["value"]} + current[attr].append(new_item) + matching_items = [new_item] + + # Apply operation to matching items + for item in matching_items: + if sub_attr: + if operation in {"add", "replace"}: + item[sub_attr] = value + elif operation == "remove": + item.pop(sub_attr, None) + elif operation in {"add", "replace"}: + if isinstance(value, dict): + item.update(value) + else: + # If value is not a dict, we can't merge it + pass + elif operation == "remove": + # Remove the entire item + if item in current[attr]: + current[attr].remove(item) + # Handle simple attribute + elif i == len(components) - 1: + # Last component + if sub_attr: + if attr not in current: + current[attr] = {} + if operation in {"add", "replace"}: + current[attr][sub_attr] = value + elif operation == "remove": + current[attr].pop(sub_attr, None) + elif operation in {"add", "replace"}: + current[attr] = value + elif operation == "remove": + current.pop(attr, None) + else: + # Navigate deeper + if attr not in current: + current[attr] = {} + current = current[attr] + + def _matches_filter(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool: + """Check if an item matches the filter expression""" + if not filter_expr: + return True + + filter_type = filter_expr.get("type", "comparison") + + if filter_type == "comparison": + return self._matches_comparison(item, filter_expr) + elif filter_type == "logical": + return self._matches_logical(item, filter_expr) + + return False + + def _matches_comparison( # noqa PLR0912 + self, item: dict[str, Any], filter_expr: dict[str, Any] + ) -> bool: + """Check if an item matches a comparison filter""" + attr = filter_expr["attribute"] + operator = filter_expr["operator"] + expected_value = filter_expr["value"] + + if attr not in item: + return False + + actual_value = item[attr] + + if operator == "eq": + return actual_value == expected_value + elif operator == "ne": + return actual_value != expected_value + elif operator == "co": + return str(expected_value) in str(actual_value) + elif operator == "sw": + return str(actual_value).startswith(str(expected_value)) + elif operator == "ew": + return str(actual_value).endswith(str(expected_value)) + elif operator == "gt": + return actual_value > expected_value + elif operator == "lt": + return actual_value < expected_value + elif operator == "ge": + return actual_value >= expected_value + elif operator == "le": + return actual_value <= expected_value + elif operator == "pr": + return actual_value is not None + + return False + + def _matches_logical(self, item: dict[str, Any], filter_expr: dict[str, Any]) -> bool: + """Check if an item matches a logical filter expression""" + operator = filter_expr["operator"] + + if operator == "and": + left_result = self._matches_filter(item, filter_expr["left"]) + right_result = self._matches_filter(item, filter_expr["right"]) + return left_result and right_result + elif operator == "or": + left_result = self._matches_filter(item, filter_expr["left"]) + right_result = self._matches_filter(item, filter_expr["right"]) + return left_result or right_result + elif operator == "not": + operand_result = self._matches_filter(item, filter_expr["operand"]) + return not operand_result + + return False diff --git a/authentik/sources/scim/schemas/schema.json b/authentik/sources/scim/schemas/schema.json index dc9cc2c9a7..6cd8680e21 100644 --- a/authentik/sources/scim/schemas/schema.json +++ b/authentik/sources/scim/schemas/schema.json @@ -1101,17 +1101,6 @@ "returned": "default", "uniqueness": "none" }, - { - "name": "password", - "type": "string", - "multiValued": false, - "description": "The User's cleartext password. This attribute is intended to be used as a means to specify an initial\npassword when creating a new User or to reset an existing User's password.", - "required": false, - "caseExact": false, - "mutability": "writeOnly", - "returned": "never", - "uniqueness": "none" - }, { "name": "emails", "type": "complex", diff --git a/authentik/sources/scim/tests/test_groups.py b/authentik/sources/scim/tests/test_groups.py index df5683c0ca..abd29fd384 100644 --- a/authentik/sources/scim/tests/test_groups.py +++ b/authentik/sources/scim/tests/test_groups.py @@ -75,7 +75,9 @@ class TestSCIMGroups(APITestCase): HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", ) self.assertEqual(response.status_code, 201) - self.assertTrue(SCIMSourceGroup.objects.filter(source=self.source, id=ext_id).exists()) + self.assertTrue( + SCIMSourceGroup.objects.filter(source=self.source, external_id=ext_id).exists() + ) self.assertTrue( Event.objects.filter( action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username @@ -86,6 +88,7 @@ class TestSCIMGroups(APITestCase): """Test group create""" user = create_test_user() ext_id = generate_id() + name = generate_id() response = self.client.post( reverse( "authentik_sources_scim:v2-groups", @@ -95,7 +98,7 @@ class TestSCIMGroups(APITestCase): ), data=dumps( { - "displayName": generate_id(), + "displayName": name, "externalId": ext_id, "members": [{"value": str(user.uuid)}], } @@ -104,12 +107,22 @@ class TestSCIMGroups(APITestCase): HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", ) self.assertEqual(response.status_code, 201) - self.assertTrue(SCIMSourceGroup.objects.filter(source=self.source, id=ext_id).exists()) + connection = SCIMSourceGroup.objects.filter(source=self.source, external_id=ext_id).first() + self.assertIsNotNone(connection) self.assertTrue( Event.objects.filter( action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username ).exists() ) + connection.refresh_from_db() + self.assertEqual( + connection.attributes, + { + "displayName": name, + "externalId": ext_id, + "members": [{"value": str(user.uuid)}], + }, + ) def test_group_create_members_empty(self): """Test group create""" @@ -126,7 +139,9 @@ class TestSCIMGroups(APITestCase): HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", ) self.assertEqual(response.status_code, 201) - self.assertTrue(SCIMSourceGroup.objects.filter(source=self.source, id=ext_id).exists()) + self.assertTrue( + SCIMSourceGroup.objects.filter(source=self.source, external_id=ext_id).exists() + ) self.assertTrue( Event.objects.filter( action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username @@ -136,7 +151,9 @@ class TestSCIMGroups(APITestCase): def test_group_create_duplicate(self): """Test group create (duplicate)""" group = Group.objects.create(name=generate_id()) - existing = SCIMSourceGroup.objects.create(source=self.source, group=group, id=uuid4()) + existing = SCIMSourceGroup.objects.create( + source=self.source, group=group, external_id=uuid4() + ) ext_id = generate_id() response = self.client.post( reverse( @@ -165,7 +182,9 @@ class TestSCIMGroups(APITestCase): def test_group_update(self): """Test group update""" group = Group.objects.create(name=generate_id()) - existing = SCIMSourceGroup.objects.create(source=self.source, group=group, id=uuid4()) + existing = SCIMSourceGroup.objects.create( + source=self.source, group=group, external_id=uuid4() + ) ext_id = generate_id() response = self.client.put( reverse( @@ -205,12 +224,49 @@ class TestSCIMGroups(APITestCase): }, ) - def test_group_patch_add(self): + def test_group_patch_modify(self): + """Test group patch""" + group = Group.objects.create(name=generate_id()) + connection = SCIMSourceGroup.objects.create( + source=self.source, + group=group, + external_id=uuid4(), + attributes={"displayName": group.name, "members": []}, + ) + response = self.client.patch( + reverse( + "authentik_sources_scim:v2-groups", + kwargs={"source_slug": self.source.slug, "group_id": group.pk}, + ), + data=dumps( + { + "Operations": [ + { + "op": "Add", + "value": {"externalId": "d85051cb-0557-4aa1-98ca-51eabcee4d40"}, + } + ] + } + ), + content_type=SCIM_CONTENT_TYPE, + HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", + ) + self.assertEqual(response.status_code, 200, response.content) + connection = SCIMSourceGroup.objects.filter(id="d85051cb-0557-4aa1-98ca-51eabcee4d40") + self.assertIsNotNone(connection) + + def test_group_patch_member_add(self): """Test group patch""" user = create_test_user() - + other_user = create_test_user() group = Group.objects.create(name=generate_id()) - SCIMSourceGroup.objects.create(source=self.source, group=group, id=uuid4()) + group.users.add(other_user) + connection = SCIMSourceGroup.objects.create( + source=self.source, + group=group, + external_id=uuid4(), + attributes={"displayName": group.name, "members": [{"value": str(other_user.uuid)}]}, + ) response = self.client.patch( reverse( "authentik_sources_scim:v2-groups", @@ -222,7 +278,7 @@ class TestSCIMGroups(APITestCase): { "op": "Add", "path": "members", - "value": {"value": str(user.uuid)}, + "value": [{"value": str(user.uuid)}], } ] } @@ -230,16 +286,33 @@ class TestSCIMGroups(APITestCase): content_type=SCIM_CONTENT_TYPE, HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", ) - self.assertEqual(response.status_code, second=200) + self.assertEqual(response.status_code, 200, response.content) self.assertTrue(group.users.filter(pk=user.pk).exists()) + self.assertTrue(group.users.filter(pk=other_user.pk).exists()) + connection.refresh_from_db() + self.assertEqual( + connection.attributes, + { + "displayName": group.name, + "members": sorted( + [{"value": str(other_user.uuid)}, {"value": str(user.uuid)}], + key=lambda u: u["value"], + ), + }, + ) - def test_group_patch_remove(self): + def test_group_patch_member_remove(self): """Test group patch""" user = create_test_user() group = Group.objects.create(name=generate_id()) group.users.add(user) - SCIMSourceGroup.objects.create(source=self.source, group=group, id=uuid4()) + connection = SCIMSourceGroup.objects.create( + source=self.source, + group=group, + external_id=uuid4(), + attributes={"displayName": group.name, "members": []}, + ) response = self.client.patch( reverse( "authentik_sources_scim:v2-groups", @@ -251,7 +324,7 @@ class TestSCIMGroups(APITestCase): { "op": "remove", "path": "members", - "value": {"value": str(user.uuid)}, + "value": [{"value": str(user.uuid)}], } ] } @@ -259,13 +332,21 @@ class TestSCIMGroups(APITestCase): content_type=SCIM_CONTENT_TYPE, HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", ) - self.assertEqual(response.status_code, second=200) + self.assertEqual(response.status_code, 200, response.content) self.assertFalse(group.users.filter(pk=user.pk).exists()) + connection.refresh_from_db() + self.assertEqual( + connection.attributes, + { + "displayName": group.name, + "members": [], + }, + ) def test_group_delete(self): """Test group delete""" group = Group.objects.create(name=generate_id()) - SCIMSourceGroup.objects.create(source=self.source, group=group, id=uuid4()) + SCIMSourceGroup.objects.create(source=self.source, group=group, external_id=uuid4()) response = self.client.delete( reverse( "authentik_sources_scim:v2-groups", diff --git a/authentik/sources/scim/tests/test_lexer.py b/authentik/sources/scim/tests/test_lexer.py new file mode 100644 index 0000000000..5703d7c031 --- /dev/null +++ b/authentik/sources/scim/tests/test_lexer.py @@ -0,0 +1,510 @@ +from unittest import TestCase + +from authentik.sources.scim.constants import ( + SCIM_URN_GROUP, + SCIM_URN_SCHEMA, + SCIM_URN_USER, + SCIM_URN_USER_ENTERPRISE, +) +from authentik.sources.scim.patch.lexer import SCIMPathLexer, Token, TokenType + + +class TestTokenType(TestCase): + """Test TokenType enum""" + + def test_token_type_values(self): + """Test that all token types have correct values""" + self.assertEqual(TokenType.ATTRIBUTE.value, "ATTRIBUTE") + self.assertEqual(TokenType.DOT.value, "DOT") + self.assertEqual(TokenType.LBRACKET.value, "LBRACKET") + self.assertEqual(TokenType.RBRACKET.value, "RBRACKET") + self.assertEqual(TokenType.LPAREN.value, "LPAREN") + self.assertEqual(TokenType.RPAREN.value, "RPAREN") + self.assertEqual(TokenType.STRING.value, "STRING") + self.assertEqual(TokenType.NUMBER.value, "NUMBER") + self.assertEqual(TokenType.BOOLEAN.value, "BOOLEAN") + self.assertEqual(TokenType.NULL.value, "NULL") + self.assertEqual(TokenType.OPERATOR.value, "OPERATOR") + self.assertEqual(TokenType.AND.value, "AND") + self.assertEqual(TokenType.OR.value, "OR") + self.assertEqual(TokenType.NOT.value, "NOT") + self.assertEqual(TokenType.EOF.value, "EOF") + + +class TestToken(TestCase): + """Test Token dataclass""" + + def test_token_creation(self): + """Test token creation with all parameters""" + token = Token(TokenType.ATTRIBUTE, "userName", 5) + self.assertEqual(token.type, TokenType.ATTRIBUTE) + self.assertEqual(token.value, "userName") + self.assertEqual(token.position, 5) + + def test_token_creation_default_position(self): + """Test token creation with default position""" + token = Token(TokenType.DOT, ".") + self.assertEqual(token.type, TokenType.DOT) + self.assertEqual(token.value, ".") + self.assertEqual(token.position, 0) + + +class TestSCIMPathLexer(TestCase): + """Test SCIMPathLexer class""" + + def setUp(self): + """Set up test fixtures""" + self.simple_lexer = SCIMPathLexer("userName") + + def test_init(self): + """Test lexer initialization""" + lexer = SCIMPathLexer("test") + self.assertEqual(lexer.text, "test") + self.assertEqual(lexer.pos, 0) + self.assertEqual(lexer.current_char, "t") + self.assertIn(SCIM_URN_SCHEMA, lexer.schema_urns) + self.assertIn(SCIM_URN_GROUP, lexer.schema_urns) + self.assertIn(SCIM_URN_USER, lexer.schema_urns) + self.assertIn(SCIM_URN_USER_ENTERPRISE, lexer.schema_urns) + self.assertEqual( + lexer.OPERATORS, ["eq", "ne", "co", "sw", "ew", "gt", "lt", "ge", "le", "pr"] + ) + + def test_init_empty_string(self): + """Test lexer initialization with empty string""" + lexer = SCIMPathLexer("") + self.assertEqual(lexer.text, "") + self.assertEqual(lexer.pos, 0) + self.assertIsNone(lexer.current_char) + + def test_advance(self): + """Test advance method""" + lexer = SCIMPathLexer("abc") + self.assertEqual(lexer.current_char, "a") + + lexer.advance() + self.assertEqual(lexer.pos, 1) + self.assertEqual(lexer.current_char, "b") + + lexer.advance() + self.assertEqual(lexer.pos, 2) + self.assertEqual(lexer.current_char, "c") + + lexer.advance() + self.assertEqual(lexer.pos, 3) + self.assertIsNone(lexer.current_char) + + def test_skip_whitespace(self): + """Test skip_whitespace method""" + lexer = SCIMPathLexer(" \t\n abc") + lexer.skip_whitespace() + self.assertEqual(lexer.current_char, "a") + + def test_skip_whitespace_only_whitespace(self): + """Test skip_whitespace with only whitespace""" + lexer = SCIMPathLexer(" \t\n ") + lexer.skip_whitespace() + self.assertIsNone(lexer.current_char) + + def test_skip_whitespace_no_whitespace(self): + """Test skip_whitespace with no leading whitespace""" + lexer = SCIMPathLexer("abc") + original_pos = lexer.pos + lexer.skip_whitespace() + self.assertEqual(lexer.pos, original_pos) + self.assertEqual(lexer.current_char, "a") + + def test_read_string_double_quotes(self): + """Test reading double-quoted string""" + lexer = SCIMPathLexer('"hello world"') + result = lexer.read_string('"') + self.assertEqual(result, "hello world") + self.assertIsNone(lexer.current_char) # Should be at end + + def test_read_string_single_quotes(self): + """Test reading single-quoted string""" + lexer = SCIMPathLexer("'hello world'") + result = lexer.read_string("'") + self.assertEqual(result, "hello world") + self.assertIsNone(lexer.current_char) + + def test_read_string_with_escapes(self): + """Test reading string with escape characters""" + lexer = SCIMPathLexer('"hello \\"world\\""') + result = lexer.read_string('"') + self.assertEqual(result, 'hello "world"') + + def test_read_string_with_backslash_at_end(self): + """Test reading string with backslash at end""" + lexer = SCIMPathLexer('"hello\\"') + result = lexer.read_string('"') + self.assertEqual(result, 'hello"') + + def test_read_string_unclosed(self): + """Test reading unclosed string""" + lexer = SCIMPathLexer('"hello world') + result = lexer.read_string('"') + self.assertEqual(result, "hello world") + self.assertIsNone(lexer.current_char) + + def test_read_string_empty(self): + """Test reading empty string""" + lexer = SCIMPathLexer('""') + result = lexer.read_string('"') + self.assertEqual(result, "") + + def test_read_number_integer(self): + """Test reading integer number""" + lexer = SCIMPathLexer("123") + result = lexer.read_number() + self.assertEqual(result, "123") + self.assertIsNone(lexer.current_char) + + def test_read_number_float(self): + """Test reading float number""" + lexer = SCIMPathLexer("123.456") + result = lexer.read_number() + self.assertEqual(result, "123.456") + self.assertIsNone(lexer.current_char) + + def test_read_number_with_multiple_dots(self): + """Test reading number with multiple dots (invalid but handled)""" + lexer = SCIMPathLexer("123.456.789") + result = lexer.read_number() + self.assertEqual(result, "123.456.789") + self.assertIsNone(lexer.current_char) + + def test_read_number_starting_with_dot(self): + """Test reading number starting with dot""" + lexer = SCIMPathLexer(".123") + result = lexer.read_number() + self.assertEqual(result, ".123") + + def test_read_identifier_simple(self): + """Test reading simple identifier""" + lexer = SCIMPathLexer("userName") + result = lexer.read_identifier() + self.assertEqual(result, "userName") + self.assertIsNone(lexer.current_char) + + def test_read_identifier_with_underscore(self): + """Test reading identifier with underscore""" + lexer = SCIMPathLexer("user_name") + result = lexer.read_identifier() + self.assertEqual(result, "user_name") + + def test_read_identifier_with_hyphen(self): + """Test reading identifier with hyphen""" + lexer = SCIMPathLexer("user-name") + result = lexer.read_identifier() + self.assertEqual(result, "user-name") + + def test_read_identifier_with_colon(self): + """Test reading identifier with colon (URN format)""" + lexer = SCIMPathLexer("urn:ietf:params:scim:schemas:core:2.0:User") + result = lexer.read_identifier() + self.assertEqual(result, "urn:ietf:params:scim:schemas:core:2.0:User") + + def test_read_identifier_schema_urn(self): + """Test reading schema URN identifier""" + lexer = SCIMPathLexer(f"{SCIM_URN_USER}.userName") + result = lexer.read_identifier() + self.assertEqual(result, SCIM_URN_USER) + self.assertEqual(lexer.current_char, ".") # Should stop at dot and set current_char to dot + + def test_read_identifier_with_version_number(self): + """Test reading identifier with version number (dots followed by digits)""" + lexer = SCIMPathLexer("urn:ietf:params:scim:schemas:core:2.0:User") + result = lexer.read_identifier() + self.assertEqual(result, "urn:ietf:params:scim:schemas:core:2.0:User") + + def test_read_identifier_partial_urn_match(self): + """Test reading identifier that partially matches URN""" + lexer = SCIMPathLexer("urn:ietf:params:scim:schemas:core:2.0:CustomUser") + result = lexer.read_identifier() + self.assertEqual(result, "urn:ietf:params:scim:schemas:core:2.0:CustomUser") + + # Test get_next_token method + def test_get_next_token_dot(self): + """Test tokenizing dot""" + lexer = SCIMPathLexer(".") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.DOT) + self.assertEqual(token.value, ".") + + def test_get_next_token_lbracket(self): + """Test tokenizing left bracket""" + lexer = SCIMPathLexer("[") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.LBRACKET) + self.assertEqual(token.value, "[") + + def test_get_next_token_rbracket(self): + """Test tokenizing right bracket""" + lexer = SCIMPathLexer("]") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.RBRACKET) + self.assertEqual(token.value, "]") + + def test_get_next_token_lparen(self): + """Test tokenizing left parenthesis""" + lexer = SCIMPathLexer("(") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.LPAREN) + self.assertEqual(token.value, "(") + + def test_get_next_token_rparen(self): + """Test tokenizing right parenthesis""" + lexer = SCIMPathLexer(")") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.RPAREN) + self.assertEqual(token.value, ")") + + def test_get_next_token_string_double_quotes(self): + """Test tokenizing double-quoted string""" + lexer = SCIMPathLexer('"test string"') + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.STRING) + self.assertEqual(token.value, "test string") + + def test_get_next_token_string_single_quotes(self): + """Test tokenizing single-quoted string""" + lexer = SCIMPathLexer("'test string'") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.STRING) + self.assertEqual(token.value, "test string") + + def test_get_next_token_number_integer(self): + """Test tokenizing integer""" + lexer = SCIMPathLexer("123") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.NUMBER) + self.assertEqual(token.value, "123") + + def test_get_next_token_number_float(self): + """Test tokenizing float""" + lexer = SCIMPathLexer("123.45") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.NUMBER) + self.assertEqual(token.value, "123.45") + + def test_get_next_token_boolean_true(self): + """Test tokenizing boolean true""" + lexer = SCIMPathLexer("true") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.BOOLEAN) + self.assertTrue(token.value) + + def test_get_next_token_boolean_false(self): + """Test tokenizing boolean false""" + lexer = SCIMPathLexer("false") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.BOOLEAN) + self.assertFalse(token.value) + + def test_get_next_token_boolean_case_insensitive(self): + """Test tokenizing boolean with different cases""" + for value in ["TRUE", "True", "FALSE", "False"]: + with self.subTest(value=value): + lexer = SCIMPathLexer(value) + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.BOOLEAN) + + def test_get_next_token_null(self): + """Test tokenizing null""" + lexer = SCIMPathLexer("null") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.NULL) + self.assertIsNone(token.value) + + def test_get_next_token_null_case_insensitive(self): + """Test tokenizing null with different cases""" + for value in ["NULL", "Null"]: + with self.subTest(value=value): + lexer = SCIMPathLexer(value) + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.NULL) + + def test_get_next_token_and(self): + """Test tokenizing AND operator""" + lexer = SCIMPathLexer("and") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.AND) + self.assertEqual(token.value, "and") + + def test_get_next_token_or(self): + """Test tokenizing OR operator""" + lexer = SCIMPathLexer("or") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.OR) + self.assertEqual(token.value, "or") + + def test_get_next_token_not(self): + """Test tokenizing NOT operator""" + lexer = SCIMPathLexer("not") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.NOT) + self.assertEqual(token.value, "not") + + def test_get_next_token_operators(self): + """Test tokenizing all comparison operators""" + operators = ["eq", "ne", "co", "sw", "ew", "gt", "lt", "ge", "le", "pr"] + for op in operators: + with self.subTest(operator=op): + lexer = SCIMPathLexer(op) + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.OPERATOR) + self.assertEqual(token.value, op) + + def test_get_next_token_operators_case_insensitive(self): + """Test tokenizing operators with different cases""" + for op in ["EQ", "Eq", "NE", "Ne"]: + with self.subTest(operator=op): + lexer = SCIMPathLexer(op) + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.OPERATOR) + self.assertEqual(token.value, op.lower()) + + def test_get_next_token_attribute(self): + """Test tokenizing attribute name""" + lexer = SCIMPathLexer("userName") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.ATTRIBUTE) + self.assertEqual(token.value, "userName") + + def test_get_next_token_attribute_with_underscore(self): + """Test tokenizing attribute name with underscore""" + lexer = SCIMPathLexer("_userName") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.ATTRIBUTE) + self.assertEqual(token.value, "_userName") + + def test_get_next_token_eof(self): + """Test tokenizing end of file""" + lexer = SCIMPathLexer("") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.EOF) + self.assertEqual(token.value, "") + + def test_get_next_token_with_whitespace(self): + """Test tokenizing with leading whitespace""" + lexer = SCIMPathLexer(" userName") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.ATTRIBUTE) + self.assertEqual(token.value, "userName") + + def test_get_next_token_skip_unknown_characters(self): + """Test that unknown characters are skipped""" + lexer = SCIMPathLexer("@#$userName") + token = lexer.get_next_token() + self.assertEqual(token.type, TokenType.ATTRIBUTE) + self.assertEqual(token.value, "userName") + + def test_get_next_token_multiple_tokens(self): + """Test tokenizing multiple tokens in sequence""" + lexer = SCIMPathLexer("userName.givenName") + + token1 = lexer.get_next_token() + self.assertEqual(token1.type, TokenType.ATTRIBUTE) + self.assertEqual(token1.value, "userName") + + token2 = lexer.get_next_token() + self.assertEqual(token2.type, TokenType.DOT) + self.assertEqual(token2.value, ".") + + token3 = lexer.get_next_token() + self.assertEqual(token3.type, TokenType.ATTRIBUTE) + self.assertEqual(token3.value, "givenName") + + token4 = lexer.get_next_token() + self.assertEqual(token4.type, TokenType.EOF) + + def test_get_next_token_complex_filter(self): + """Test tokenizing complex filter expression""" + lexer = SCIMPathLexer('emails[type eq "work" and primary eq true]') + + tokens = [] + while True: + token = lexer.get_next_token() + tokens.append(token) + if token.type == TokenType.EOF: + break + + expected_types = [ + TokenType.ATTRIBUTE, # emails + TokenType.LBRACKET, # [ + TokenType.ATTRIBUTE, # type + TokenType.OPERATOR, # eq + TokenType.STRING, # "work" + TokenType.AND, # and + TokenType.ATTRIBUTE, # primary + TokenType.OPERATOR, # eq + TokenType.BOOLEAN, # true + TokenType.RBRACKET, # ] + TokenType.EOF, + ] + + self.assertEqual(len(tokens), len(expected_types)) + for token, expected_type in zip(tokens, expected_types, strict=False): + self.assertEqual(token.type, expected_type) + + def test_get_next_token_urn_attribute(self): + """Test tokenizing URN-based attribute""" + lexer = SCIMPathLexer(f"{SCIM_URN_USER}.userName") + + token1 = lexer.get_next_token() + self.assertEqual(token1.type, TokenType.ATTRIBUTE) + self.assertEqual(token1.value, SCIM_URN_USER) + + token2 = lexer.get_next_token() + self.assertEqual(token2.type, TokenType.DOT) + + token3 = lexer.get_next_token() + self.assertEqual(token3.type, TokenType.ATTRIBUTE) + self.assertEqual(token3.value, "userName") + + def test_get_next_token_enterprise_urn(self): + """Test tokenizing enterprise URN""" + lexer = SCIMPathLexer(f"{SCIM_URN_USER_ENTERPRISE}.manager") + + token1 = lexer.get_next_token() + self.assertEqual(token1.type, TokenType.ATTRIBUTE) + self.assertEqual(token1.value, SCIM_URN_USER_ENTERPRISE) + + token2 = lexer.get_next_token() + self.assertEqual(token2.type, TokenType.DOT) + + def test_lexer_state_after_eof(self): + """Test lexer state after reaching EOF""" + lexer = SCIMPathLexer("a") + + # Get first token + token1 = lexer.get_next_token() + self.assertEqual(token1.type, TokenType.ATTRIBUTE) + + # Get EOF token + token2 = lexer.get_next_token() + self.assertEqual(token2.type, TokenType.EOF) + + # Should continue returning EOF + token3 = lexer.get_next_token() + self.assertEqual(token3.type, TokenType.EOF) + + def test_read_identifier_edge_cases(self): + """Test read_identifier with edge cases""" + # Test identifier ending with colon + lexer = SCIMPathLexer("test:") + result = lexer.read_identifier() + self.assertEqual(result, "test:") + + # Test identifier with numbers + lexer = SCIMPathLexer("test123") + result = lexer.read_identifier() + self.assertEqual(result, "test123") + + def test_complex_urn_parsing(self): + """Test parsing complex URN with version numbers""" + urn = "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User" + lexer = SCIMPathLexer(urn) + result = lexer.read_identifier() + self.assertEqual(result, urn) diff --git a/authentik/sources/scim/tests/test_patch.py b/authentik/sources/scim/tests/test_patch.py new file mode 100644 index 0000000000..eb8069da05 --- /dev/null +++ b/authentik/sources/scim/tests/test_patch.py @@ -0,0 +1,1254 @@ +from unittest.mock import Mock, patch + +from rest_framework.test import APITestCase + +from authentik.providers.scim.clients.schema import PatchOp, PatchOperation +from authentik.sources.scim.constants import SCIM_URN_USER_ENTERPRISE +from authentik.sources.scim.patch.parser import SCIMPathParser +from authentik.sources.scim.patch.processor import SCIMPatchProcessor + + +class TestSCIMPatchProcessor(APITestCase): + + def setUp(self): + """Set up test fixtures""" + self.processor = SCIMPatchProcessor() + self.sample_data = { + "userName": "john.doe", + "name": {"givenName": "John", "familyName": "Doe"}, + "emails": [ + {"value": "john@example.com", "type": "work", "primary": True}, + {"value": "john.personal@example.com", "type": "personal"}, + ], + "active": True, + } + + def test_data(self): + user_data = { + "id": "user123", + "userName": "john.doe", + "name": {"formatted": "John Doe", "familyName": "Doe", "givenName": "John"}, + "emails": [ + {"value": "john.doe@example.com", "type": "work", "primary": True}, + {"value": "john.personal@example.com", "type": "personal", "primary": False}, + ], + "phoneNumbers": [ + {"value": "+1-555-123-4567", "type": "work", "primary": True}, + {"value": "+1-555-987-6543", "type": "mobile", "primary": False}, + ], + "addresses": [ + { + "streetAddress": "123 Work St", + "city": "Work City", + "type": "work", + "primary": True, + }, + { + "streetAddress": "456 Home Ave", + "city": "Home City", + "type": "home", + "primary": False, + }, + { + "streetAddress": "789 Other Rd", + "city": "Other City", + "type": "work", + "primary": False, + }, + ], + } + + # Create processor + processor = SCIMPatchProcessor() + + # Example patch operations + patches = [ + # Replace primary phone number + PatchOperation( + op=PatchOp.replace, + path="phoneNumbers[primary eq true].value", + value="+1-555-999-0000", + ), + # Add new email + PatchOperation( + op=PatchOp.add, + path="emails", + value={"value": "john.new@example.com", "type": "home", "primary": False}, + ), + # Update user's given name + PatchOperation(op=PatchOp.replace, path="name.givenName", value="Johnny"), + # Remove work email + PatchOperation(op=PatchOp.remove, path='emails[type eq "work"]'), + # Add with empty path, simple object + PatchOperation(op=PatchOp.add, path=None, value={"foo": "bar"}), + # Empty path with complex object + PatchOperation(op=PatchOp.add, path=None, value={"name.formatted": "formatted"}), + ] + result = processor.apply_patches(user_data, patches) + self.assertEqual( + result, + { + "id": "user123", + "userName": "john.doe", + "name": {"formatted": "formatted", "familyName": "Doe", "givenName": "Johnny"}, + "emails": [ + {"value": "john.personal@example.com", "type": "personal", "primary": False}, + {"value": "john.new@example.com", "type": "home", "primary": False}, + ], + "phoneNumbers": [ + {"value": "+1-555-999-0000", "type": "work", "primary": True}, + {"value": "+1-555-987-6543", "type": "mobile", "primary": False}, + ], + "addresses": [ + { + "streetAddress": "123 Work St", + "city": "Work City", + "type": "work", + "primary": True, + }, + { + "streetAddress": "456 Home Ave", + "city": "Home City", + "type": "home", + "primary": False, + }, + { + "streetAddress": "789 Other Rd", + "city": "Other City", + "type": "work", + "primary": False, + }, + ], + "foo": "bar", + }, + ) + + def test_parse(self): + test_paths = [ + { + "filter": "userName", + "components": [{"attribute": "userName", "filter": None, "sub_attribute": None}], + }, + { + "filter": "name.givenName", + "components": [{"attribute": "name", "filter": None, "sub_attribute": "givenName"}], + }, + { + "filter": "emails[primary eq true].value", + "components": [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "primary", + "operator": "eq", + "value": True, + }, + "sub_attribute": "value", + } + ], + }, + { + "filter": 'phoneNumbers[type eq "work"].value', + "components": [ + { + "attribute": "phoneNumbers", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "sub_attribute": "value", + } + ], + }, + { + "filter": 'addresses[type eq "work" and primary eq true].streetAddress', + "components": [ + { + "attribute": "addresses", + "filter": { + "type": "logical", + "operator": "and", + "left": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "right": { + "type": "comparison", + "attribute": "primary", + "operator": "eq", + "value": True, + }, + }, + "sub_attribute": "streetAddress", + } + ], + }, + { + "filter": f"{SCIM_URN_USER_ENTERPRISE}:manager", + "components": [ + { + "attribute": SCIM_URN_USER_ENTERPRISE, + "filter": None, + "sub_attribute": "manager", + } + ], + }, + ] + + for path in test_paths: + with self.subTest(path=path["filter"]): + parser = SCIMPathParser() + components = parser.parse_path(path["filter"]) + self.assertEqual(components, path["components"]) + + def test_init(self): + """Test processor initialization""" + processor = SCIMPatchProcessor() + self.assertIsNotNone(processor.parser) + + def test_apply_patches_empty_list(self): + """Test applying empty patch list returns unchanged data""" + result = self.processor.apply_patches(self.sample_data, []) + self.assertEqual(result, self.sample_data) + # Ensure original data is not modified + self.assertIsNot(result, self.sample_data) + + def test_apply_patches_with_validation(self): + """Test that patches are validated using PatchOperation.model_validate""" + with patch("authentik.sources.scim.patch.processor.PatchOperation") as mock_patch_op: + mock_patch_op.model_validate.return_value = Mock( + path="userName", op=PatchOp.replace, value="jane.doe" + ) + + patches = [{"op": "replace", "path": "userName", "value": "jane.doe"}] + self.processor.apply_patches(self.sample_data, patches) + + mock_patch_op.model_validate.assert_called_once() + + # Test ADD operations + def test_apply_add_simple_attribute(self): + """Test adding a simple attribute""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "title", "filter": None, "sub_attribute": None} + ] + + patches = [PatchOperation(op=PatchOp.add, path="title", value="Manager")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["title"], "Manager") + + def test_apply_add_sub_attribute(self): + """Test adding a sub-attribute""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "name", "filter": None, "sub_attribute": "middleName"} + ] + + patches = [PatchOperation(op=PatchOp.add, path="name.middleName", value="William")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["name"]["middleName"], "William") + + def test_apply_add_sub_attribute_new_parent(self): + """Test adding a sub-attribute when parent doesn't exist""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "address", "filter": None, "sub_attribute": "street"} + ] + + patches = [PatchOperation(op=PatchOp.add, path="address.street", value="123 Main St")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["address"]["street"], "123 Main St") + + def test_apply_add_enterprise_manager(self): + """Test adding enterprise manager attribute (special case)""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": SCIM_URN_USER_ENTERPRISE, "filter": None, "sub_attribute": "manager"} + ] + + patches = [ + PatchOperation( + op=PatchOp.add, path=f"{SCIM_URN_USER_ENTERPRISE}.manager", value="mgr123" + ) + ] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result[SCIM_URN_USER_ENTERPRISE]["manager"], {"value": "mgr123"}) + + def test_apply_add_to_existing_array(self): + """Test adding to an existing array attribute""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "emails", "filter": None, "sub_attribute": None} + ] + + new_email = {"value": "john.work@example.com", "type": "work"} + patches = [PatchOperation(op=PatchOp.add, path="emails", value=new_email)] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(len(result["emails"]), 3) + self.assertIn(new_email, result["emails"]) + + def test_apply_add_new_attribute_as_value(self): + """Test adding a new attribute that gets set as value (not array)""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "department", "filter": None, "sub_attribute": None} + ] + + patches = [PatchOperation(op=PatchOp.add, path="department", value="Engineering")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["department"], "Engineering") + + def test_apply_add_complex_path(self): + """Test adding with complex path (filters)""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + { + "attribute": "emails", + "filter": {"type": "comparison"}, + "sub_attribute": "verified", + } + ] + + patches = [ + PatchOperation(op=PatchOp.add, path='emails[type eq "work"].verified', value=True) + ] + + with patch.object(self.processor, "_navigate_and_modify") as mock_navigate: + self.processor.apply_patches(self.sample_data, patches) + mock_navigate.assert_called_once() + + # Test REMOVE operations + def test_apply_remove_simple_attribute(self): + """Test removing a simple attribute""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "active", "filter": None, "sub_attribute": None} + ] + + patches = [PatchOperation(op=PatchOp.remove, path="active")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertNotIn("active", result) + + def test_apply_remove_sub_attribute(self): + """Test removing a sub-attribute""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "name", "filter": None, "sub_attribute": "givenName"} + ] + + patches = [PatchOperation(op=PatchOp.remove, path="name.givenName")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertNotIn("givenName", result["name"]) + self.assertIn("familyName", result["name"]) # Other sub-attributes remain + + def test_apply_remove_sub_attribute_nonexistent_parent(self): + """Test removing sub-attribute when parent doesn't exist""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "nonexistent", "filter": None, "sub_attribute": "field"} + ] + + patches = [PatchOperation(op=PatchOp.remove, path="nonexistent.field")] + result = self.processor.apply_patches(self.sample_data, patches) + + # Should not raise error and data should be unchanged + self.assertEqual(result, self.sample_data) + + def test_apply_remove_nonexistent_attribute(self): + """Test removing a non-existent attribute (should not raise error)""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "nonexistent", "filter": None, "sub_attribute": None} + ] + + patches = [PatchOperation(op=PatchOp.remove, path="nonexistent")] + result = self.processor.apply_patches(self.sample_data, patches) + + # Should not raise error and data should be unchanged + self.assertEqual(result, self.sample_data) + + # Test REPLACE operations + def test_apply_replace_simple_attribute(self): + """Test replacing a simple attribute""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "userName", "filter": None, "sub_attribute": None} + ] + + patches = [PatchOperation(op=PatchOp.replace, path="userName", value="jane.doe")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["userName"], "jane.doe") + + def test_apply_replace_sub_attribute(self): + """Test replacing a sub-attribute""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "name", "filter": None, "sub_attribute": "givenName"} + ] + + patches = [PatchOperation(op=PatchOp.replace, path="name.givenName", value="Jane")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["name"]["givenName"], "Jane") + + def test_apply_replace_sub_attribute_new_parent(self): + """Test replacing sub-attribute when parent doesn't exist""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": "address", "filter": None, "sub_attribute": "city"} + ] + + patches = [PatchOperation(op=PatchOp.replace, path="address.city", value="New York")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["address"]["city"], "New York") + + def test_apply_replace_enterprise_manager(self): + """Test replacing enterprise manager attribute (special case)""" + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.return_value = [ + {"attribute": SCIM_URN_USER_ENTERPRISE, "filter": None, "sub_attribute": "manager"} + ] + + patches = [ + PatchOperation( + op=PatchOp.replace, + path=f"{SCIM_URN_USER_ENTERPRISE}.manager", + value="newmgr456", + ) + ] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result[SCIM_URN_USER_ENTERPRISE]["manager"], {"value": "newmgr456"}) + + # Test bulk operations (path is None) + def test_apply_bulk_add_operation(self): + """Test bulk add operation when path is None""" + patches = [ + PatchOperation( + op=PatchOp.add, path=None, value={"title": "Manager", "department": "IT"} + ) + ] + + with patch.object(self.processor, "_apply_add") as mock_add: + self.processor.apply_patches(self.sample_data, patches) + self.assertEqual(mock_add.call_count, 2) + + def test_apply_bulk_remove_operation(self): + """Test bulk remove operation when path is None""" + patches = [ + PatchOperation(op=PatchOp.remove, path=None, value={"active": None, "userName": None}) + ] + + with patch.object(self.processor, "_apply_remove") as mock_remove: + self.processor.apply_patches(self.sample_data, patches) + self.assertEqual(mock_remove.call_count, 2) + + def test_apply_bulk_replace_operation(self): + """Test bulk replace operation when path is None""" + patches = [ + PatchOperation( + op=PatchOp.replace, path=None, value={"userName": "jane.doe", "active": False} + ) + ] + + with patch.object(self.processor, "_apply_replace") as mock_replace: + self.processor.apply_patches(self.sample_data, patches) + self.assertEqual(mock_replace.call_count, 2) + + def test_apply_bulk_operation_invalid_value(self): + """Test bulk operation with non-dict value (should be ignored)""" + patches = [PatchOperation(op=PatchOp.add, path=None, value="invalid")] + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result, self.sample_data) + + # Test _navigate_and_modify method + def test_navigate_and_modify_with_filter_add_new_item(self): + """Test navigating with filter and adding new item""" + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "home", + }, + "sub_attribute": None, + } + ] + + new_email = {"value": "home@example.com", "type": "home"} + data_copy = self.sample_data.copy() + data_copy["emails"] = self.sample_data["emails"].copy() + + self.processor._navigate_and_modify(data_copy, components, new_email, "add") + + # Should add new email with type "home" + home_emails = [email for email in data_copy["emails"] if email.get("type") == "home"] + self.assertEqual(len(home_emails), 1) + + def test_navigate_and_modify_with_filter_modify_existing(self): + """Test navigating with filter and modifying existing item""" + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "sub_attribute": "verified", + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = [email.copy() for email in self.sample_data["emails"]] + + self.processor._navigate_and_modify(data_copy, components, True, "add") + + # Should add verified field to work email + work_email = next(email for email in data_copy["emails"] if email.get("type") == "work") + self.assertTrue(work_email["verified"]) + + def test_navigate_and_modify_remove_item(self): + """Test removing entire item with filter""" + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "personal", + }, + "sub_attribute": None, + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = [email.copy() for email in self.sample_data["emails"]] + original_count = len(data_copy["emails"]) + + self.processor._navigate_and_modify(data_copy, components, None, "remove") + + # Should remove personal email + self.assertEqual(len(data_copy["emails"]), original_count - 1) + personal_emails = [ + email for email in data_copy["emails"] if email.get("type") == "personal" + ] + self.assertEqual(len(personal_emails), 0) + + def test_navigate_and_modify_nonexistent_attribute_add(self): + """Test navigating to non-existent attribute for add operation""" + components = [ + { + "attribute": "phones", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "mobile", + }, + "sub_attribute": None, + } + ] + + data_copy = self.sample_data.copy() + self.processor._navigate_and_modify( + data_copy, components, {"value": "123-456-7890", "type": "mobile"}, "add" + ) + + # Should create new phones array + self.assertIn("phones", data_copy) + self.assertEqual(len(data_copy["phones"]), 1) + + def test_navigate_and_modify_nonexistent_attribute_remove(self): + """Test navigating to non-existent attribute for remove operation""" + components = [ + { + "attribute": "phones", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "mobile", + }, + "sub_attribute": None, + } + ] + + data_copy = self.sample_data.copy() + self.processor._navigate_and_modify(data_copy, components, None, "remove") + + # Should not create attribute or raise error + self.assertNotIn("phones", data_copy) + + # Test filter matching methods + def test_matches_filter_no_filter(self): + """Test matching with no filter (should return True)""" + item = {"type": "work"} + result = self.processor._matches_filter(item, None) + self.assertTrue(result) + + def test_matches_filter_empty_filter(self): + """Test matching with empty filter (should return True)""" + item = {"type": "work"} + result = self.processor._matches_filter(item, {}) + self.assertTrue(result) + + def test_matches_filter_unknown_type(self): + """Test matching with unknown filter type""" + item = {"type": "work"} + filter_expr = {"type": "unknown"} + result = self.processor._matches_filter(item, filter_expr) + self.assertFalse(result) + + def test_matches_comparison_eq(self): + """Test comparison filter with eq operator""" + item = {"type": "work", "primary": True} + filter_expr = {"type": "comparison", "attribute": "type", "operator": "eq", "value": "work"} + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_eq_false(self): + """Test comparison filter with eq operator (false case)""" + item = {"type": "work"} + filter_expr = { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "personal", + } + + result = self.processor._matches_comparison(item, filter_expr) + self.assertFalse(result) + + def test_matches_comparison_ne(self): + """Test comparison filter with ne operator""" + item = {"type": "work"} + filter_expr = { + "type": "comparison", + "attribute": "type", + "operator": "ne", + "value": "personal", + } + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_co(self): + """Test comparison filter with co (contains) operator""" + item = {"value": "john@example.com"} + filter_expr = { + "type": "comparison", + "attribute": "value", + "operator": "co", + "value": "example", + } + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_sw(self): + """Test comparison filter with sw (starts with) operator""" + item = {"value": "john@example.com"} + filter_expr = { + "type": "comparison", + "attribute": "value", + "operator": "sw", + "value": "john", + } + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_ew(self): + """Test comparison filter with ew (ends with) operator""" + item = {"value": "john@example.com"} + filter_expr = { + "type": "comparison", + "attribute": "value", + "operator": "ew", + "value": ".com", + } + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_gt(self): + """Test comparison filter with gt (greater than) operator""" + item = {"priority": 10} + filter_expr = {"type": "comparison", "attribute": "priority", "operator": "gt", "value": 5} + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_lt(self): + """Test comparison filter with lt (less than) operator""" + item = {"priority": 3} + filter_expr = {"type": "comparison", "attribute": "priority", "operator": "lt", "value": 5} + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_ge(self): + """Test comparison filter with ge (greater than or equal) operator""" + item = {"priority": 5} + filter_expr = {"type": "comparison", "attribute": "priority", "operator": "ge", "value": 5} + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_le(self): + """Test comparison filter with le (less than or equal) operator""" + item = {"priority": 5} + filter_expr = {"type": "comparison", "attribute": "priority", "operator": "le", "value": 5} + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_pr(self): + """Test comparison filter with pr (present) operator""" + item = {"value": "john@example.com"} + filter_expr = {"type": "comparison", "attribute": "value", "operator": "pr", "value": None} + + result = self.processor._matches_comparison(item, filter_expr) + self.assertTrue(result) + + def test_matches_comparison_pr_false(self): + """Test comparison filter with pr operator (false case)""" + item = {"value": None} + filter_expr = {"type": "comparison", "attribute": "value", "operator": "pr", "value": None} + + result = self.processor._matches_comparison(item, filter_expr) + self.assertFalse(result) + + def test_matches_comparison_missing_attribute(self): + """Test comparison filter with missing attribute""" + item = {"type": "work"} + filter_expr = { + "type": "comparison", + "attribute": "missing", + "operator": "eq", + "value": "test", + } + + result = self.processor._matches_comparison(item, filter_expr) + self.assertFalse(result) + + def test_matches_comparison_unknown_operator(self): + """Test comparison filter with unknown operator""" + item = {"type": "work"} + filter_expr = { + "type": "comparison", + "attribute": "type", + "operator": "unknown", + "value": "work", + } + + result = self.processor._matches_comparison(item, filter_expr) + self.assertFalse(result) + + def test_matches_logical_and_true(self): + """Test logical AND filter (true case)""" + item = {"type": "work", "primary": True} + filter_expr = { + "type": "logical", + "operator": "and", + "left": {"type": "comparison", "attribute": "type", "operator": "eq", "value": "work"}, + "right": { + "type": "comparison", + "attribute": "primary", + "operator": "eq", + "value": True, + }, + } + + result = self.processor._matches_logical(item, filter_expr) + self.assertTrue(result) + + def test_matches_logical_and_false(self): + """Test logical AND filter (false case)""" + item = {"type": "work", "primary": False} + filter_expr = { + "type": "logical", + "operator": "and", + "left": {"type": "comparison", "attribute": "type", "operator": "eq", "value": "work"}, + "right": { + "type": "comparison", + "attribute": "primary", + "operator": "eq", + "value": True, + }, + } + + result = self.processor._matches_logical(item, filter_expr) + self.assertFalse(result) + + def test_matches_logical_or_true(self): + """Test logical OR filter (true case)""" + item = {"type": "personal", "primary": True} + filter_expr = { + "type": "logical", + "operator": "or", + "left": {"type": "comparison", "attribute": "type", "operator": "eq", "value": "work"}, + "right": { + "type": "comparison", + "attribute": "primary", + "operator": "eq", + "value": True, + }, + } + + result = self.processor._matches_logical(item, filter_expr) + self.assertTrue(result) + + def test_matches_logical_or_false(self): + """Test logical OR filter (false case)""" + item = {"type": "personal", "primary": False} + filter_expr = { + "type": "logical", + "operator": "or", + "left": {"type": "comparison", "attribute": "type", "operator": "eq", "value": "work"}, + "right": { + "type": "comparison", + "attribute": "primary", + "operator": "eq", + "value": True, + }, + } + + result = self.processor._matches_logical(item, filter_expr) + self.assertFalse(result) + + def test_matches_logical_not_true(self): + """Test logical NOT filter (true case)""" + item = {"type": "personal"} + filter_expr = { + "type": "logical", + "operator": "not", + "operand": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + } + + result = self.processor._matches_logical(item, filter_expr) + self.assertTrue(result) + + def test_matches_logical_not_false(self): + """Test logical NOT filter (false case)""" + item = {"type": "work"} + filter_expr = { + "type": "logical", + "operator": "not", + "operand": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + } + + result = self.processor._matches_logical(item, filter_expr) + self.assertFalse(result) + + def test_matches_logical_unknown_operator(self): + """Test logical filter with unknown operator""" + item = {"type": "work"} + filter_expr = { + "type": "logical", + "operator": "unknown", + "left": {"type": "comparison", "attribute": "type", "operator": "eq", "value": "work"}, + } + + result = self.processor._matches_logical(item, filter_expr) + self.assertFalse(result) + + def test_multiple_patches_applied_sequentially(self): + """Test that multiple patches are applied in sequence""" + patches = [ + PatchOperation(op=PatchOp.add, path="title", value="Manager"), + PatchOperation(op=PatchOp.replace, path="userName", value="jane.doe"), + PatchOperation(op=PatchOp.remove, path="active"), + ] + + with patch.object(self.processor.parser, "parse_path") as mock_parse: + mock_parse.side_effect = [ + [{"attribute": "title", "filter": None, "sub_attribute": None}], + [{"attribute": "userName", "filter": None, "sub_attribute": None}], + [{"attribute": "active", "filter": None, "sub_attribute": None}], + ] + + result = self.processor.apply_patches(self.sample_data, patches) + + self.assertEqual(result["title"], "Manager") + self.assertEqual(result["userName"], "jane.doe") + self.assertNotIn("active", result) + + def test_navigate_and_modify_simple_attribute_last_component_add(self): + """Test navigating to simple attribute as last component with add operation""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "title", "filter": None, "sub_attribute": None}, + ] + + data_copy = self.sample_data.copy() + data_copy["profile"] = {} + + self.processor._navigate_and_modify(data_copy, components, "Senior Manager", "add") + + self.assertEqual(data_copy["profile"]["title"], "Senior Manager") + + def test_navigate_and_modify_simple_attribute_last_component_replace(self): + """Test navigating to simple attribute as last component with replace operation""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "title", "filter": None, "sub_attribute": None}, + ] + + data_copy = self.sample_data.copy() + data_copy["profile"] = {"title": "Manager"} + + self.processor._navigate_and_modify(data_copy, components, "Director", "replace") + + self.assertEqual(data_copy["profile"]["title"], "Director") + + def test_navigate_and_modify_simple_attribute_last_component_remove(self): + """Test navigating to simple attribute as last component with remove operation""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "title", "filter": None, "sub_attribute": None}, + ] + + data_copy = self.sample_data.copy() + data_copy["profile"] = {"title": "Manager", "department": "IT"} + + self.processor._navigate_and_modify(data_copy, components, None, "remove") + + self.assertNotIn("title", data_copy["profile"]) + self.assertIn("department", data_copy["profile"]) # Other attributes remain + + def test_navigate_and_modify_sub_attribute_last_component_add(self): + """Test navigating to sub-attribute as last component with add operation""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "address", "filter": None, "sub_attribute": "street"}, + ] + + data_copy = self.sample_data.copy() + data_copy["profile"] = {"address": {}} + + self.processor._navigate_and_modify(data_copy, components, "123 Main St", "add") + + self.assertEqual(data_copy["profile"]["address"]["street"], "123 Main St") + + def test_navigate_and_modify_sub_attribute_last_component_replace(self): + """Test navigating to sub-attribute as last component with replace operation""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "address", "filter": None, "sub_attribute": "street"}, + ] + + data_copy = self.sample_data.copy() + data_copy["profile"] = {"address": {"street": "456 Oak Ave"}} + + self.processor._navigate_and_modify(data_copy, components, "789 Pine Rd", "replace") + + self.assertEqual(data_copy["profile"]["address"]["street"], "789 Pine Rd") + + def test_navigate_and_modify_sub_attribute_last_component_remove(self): + """Test navigating to sub-attribute as last component with remove operation""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "address", "filter": None, "sub_attribute": "street"}, + ] + + data_copy = self.sample_data.copy() + data_copy["profile"] = {"address": {"street": "123 Main St", "city": "New York"}} + + self.processor._navigate_and_modify(data_copy, components, None, "remove") + + self.assertNotIn("street", data_copy["profile"]["address"]) + self.assertIn("city", data_copy["profile"]["address"]) # Other sub-attributes remain + + def test_navigate_and_modify_sub_attribute_parent_not_exists(self): + """Test navigating to sub-attribute when parent attribute doesn't exist""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "address", "filter": None, "sub_attribute": "street"}, + ] + + data_copy = self.sample_data.copy() + data_copy["profile"] = {} # address doesn't exist yet + + self.processor._navigate_and_modify(data_copy, components, "123 Main St", "add") + + self.assertEqual(data_copy["profile"]["address"]["street"], "123 Main St") + + def test_navigate_and_modify_deeper_navigation(self): + """Test navigating deeper through multiple levels without filters""" + components = [ + {"attribute": "organization", "filter": None, "sub_attribute": None}, + {"attribute": "department", "filter": None, "sub_attribute": None}, + {"attribute": "team", "filter": None, "sub_attribute": None}, + {"attribute": "name", "filter": None, "sub_attribute": None}, + ] + + data_copy = self.sample_data.copy() + + self.processor._navigate_and_modify(data_copy, components, "Engineering Team Alpha", "add") + + self.assertEqual( + data_copy["organization"]["department"]["team"]["name"], "Engineering Team Alpha" + ) + + def test_navigate_and_modify_deeper_navigation_partial_path_exists(self): + """Test navigating deeper when part of the path already exists""" + components = [ + {"attribute": "organization", "filter": None, "sub_attribute": None}, + {"attribute": "department", "filter": None, "sub_attribute": None}, + {"attribute": "budget", "filter": None, "sub_attribute": None}, + ] + + data_copy = self.sample_data.copy() + data_copy["organization"] = {"department": {"name": "IT"}} + + self.processor._navigate_and_modify(data_copy, components, 100000, "add") + + self.assertEqual(data_copy["organization"]["department"]["budget"], 100000) + self.assertEqual( + data_copy["organization"]["department"]["name"], "IT" + ) # Existing data preserved + + def test_navigate_and_modify_array_not_list_type(self): + """Test navigation when expected array attribute is not a list""" + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "sub_attribute": "verified", + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = "not_a_list" # Invalid type + + # Should return early without error + self.processor._navigate_and_modify(data_copy, components, True, "add") + + # Data should remain unchanged + self.assertEqual(data_copy["emails"], "not_a_list") + + def test_navigate_and_modify_update_matching_item_with_dict_value(self): + """Test updating matching item with dictionary value""" + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "sub_attribute": None, + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = [email.copy() for email in self.sample_data["emails"]] + + update_data = {"verified": True, "lastChecked": "2023-01-01"} + self.processor._navigate_and_modify(data_copy, components, update_data, "add") + + work_email = next(email for email in data_copy["emails"] if email.get("type") == "work") + self.assertTrue(work_email["verified"]) + self.assertEqual(work_email["lastChecked"], "2023-01-01") + # Original fields should still exist + self.assertEqual(work_email["value"], "john@example.com") + + def test_navigate_and_modify_update_matching_item_with_non_dict_value(self): + """Test updating matching item with non-dictionary value (should be ignored)""" + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "sub_attribute": None, + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = [email.copy() for email in self.sample_data["emails"]] + original_work_email = next( + email for email in data_copy["emails"] if email.get("type") == "work" + ).copy() + + # Try to update with non-dict value + self.processor._navigate_and_modify(data_copy, components, "string_value", "add") + + # Email should remain unchanged + work_email = next(email for email in data_copy["emails"] if email.get("type") == "work") + self.assertEqual(work_email, original_work_email) + + def test_navigate_and_modify_remove_entire_matching_item(self): + """Test removing entire matching item from array""" + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "personal", + }, + "sub_attribute": None, + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = [email.copy() for email in self.sample_data["emails"]] + original_count = len(data_copy["emails"]) + + self.processor._navigate_and_modify(data_copy, components, None, "remove") + + # Should remove the personal email + self.assertEqual(len(data_copy["emails"]), original_count - 1) + personal_emails = [ + email for email in data_copy["emails"] if email.get("type") == "personal" + ] + self.assertEqual(len(personal_emails), 0) + + # Work email should still exist + work_emails = [email for email in data_copy["emails"] if email.get("type") == "work"] + self.assertEqual(len(work_emails), 1) + + def test_navigate_and_modify_mixed_filters_and_simple_navigation(self): + """Test navigation with mix of filtered and simple components""" + # This test actually reveals a limitation in the current implementation + # The _navigate_and_modify method doesn't properly handle navigation + # after a filtered component. Let's test what actually happens. + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "sub_attribute": "verified", # Changed to test sub_attribute on filtered item + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = [email.copy() for email in self.sample_data["emails"]] + + self.processor._navigate_and_modify(data_copy, components, True, "add") + + work_email = next(email for email in data_copy["emails"] if email.get("type") == "work") + self.assertTrue(work_email["verified"]) + + def test_navigate_and_modify_simple_navigation_multiple_levels(self): + """Test simple navigation through multiple levels without filters""" + components = [ + {"attribute": "profile", "filter": None, "sub_attribute": None}, + {"attribute": "settings", "filter": None, "sub_attribute": None}, + {"attribute": "notifications", "filter": None, "sub_attribute": "email"}, + ] + + data_copy = self.sample_data.copy() + + self.processor._navigate_and_modify(data_copy, components, True, "add") + + self.assertTrue(data_copy["profile"]["settings"]["notifications"]["email"]) + + def test_navigate_and_modify_filter_then_simple_attribute_workaround(self): + """Test the actual behavior when we have filter followed by simple navigation""" + # Based on the code, after processing a filter, the method doesn't continue + # to navigate deeper. This test documents the current behavior. + components = [ + { + "attribute": "emails", + "filter": { + "type": "comparison", + "attribute": "type", + "operator": "eq", + "value": "work", + }, + "sub_attribute": None, + } + ] + + data_copy = self.sample_data.copy() + data_copy["emails"] = [email.copy() for email in self.sample_data["emails"]] + + # Update the work email with a dict containing nested data + update_data = {"metadata": {"verified": True, "source": "manual"}} + self.processor._navigate_and_modify(data_copy, components, update_data, "add") + + work_email = next(email for email in data_copy["emails"] if email.get("type") == "work") + self.assertTrue(work_email["metadata"]["verified"]) + self.assertEqual(work_email["metadata"]["source"], "manual") + + def test_navigate_and_modify_intermediate_navigation_missing_parent(self): + """Test navigation when intermediate parent doesn't exist""" + components = [ + {"attribute": "organization", "filter": None, "sub_attribute": None}, + {"attribute": "department", "filter": None, "sub_attribute": None}, + {"attribute": "name", "filter": None, "sub_attribute": None}, + ] + + data_copy = self.sample_data.copy() + # organization doesn't exist initially + + self.processor._navigate_and_modify(data_copy, components, "Engineering", "add") + + self.assertEqual(data_copy["organization"]["department"]["name"], "Engineering") + + def test_navigate_and_modify_intermediate_navigation_existing_path(self): + """Test navigation when part of the path already exists""" + components = [ + {"attribute": "organization", "filter": None, "sub_attribute": None}, + {"attribute": "department", "filter": None, "sub_attribute": None}, + {"attribute": "budget", "filter": None, "sub_attribute": None}, + ] + + data_copy = self.sample_data.copy() + data_copy["organization"] = {"department": {"name": "IT", "head": "John"}} + + self.processor._navigate_and_modify(data_copy, components, 500000, "add") + + self.assertEqual(data_copy["organization"]["department"]["budget"], 500000) + # Existing data should be preserved + self.assertEqual(data_copy["organization"]["department"]["name"], "IT") + self.assertEqual(data_copy["organization"]["department"]["head"], "John") diff --git a/authentik/sources/scim/tests/test_users.py b/authentik/sources/scim/tests/test_users.py index a862035d30..fded8a1c26 100644 --- a/authentik/sources/scim/tests/test_users.py +++ b/authentik/sources/scim/tests/test_users.py @@ -10,6 +10,7 @@ from authentik.core.tests.utils import create_test_user from authentik.events.models import Event, EventAction from authentik.lib.generators import generate_id from authentik.providers.scim.clients.schema import User as SCIMUserSchema +from authentik.sources.scim.constants import SCIM_URN_USER_ENTERPRISE from authentik.sources.scim.models import SCIMSource, SCIMSourcePropertyMapping, SCIMSourceUser from authentik.sources.scim.views.v2.base import SCIM_CONTENT_TYPE @@ -81,7 +82,9 @@ class TestSCIMUsers(APITestCase): HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", ) self.assertEqual(response.status_code, 201) - self.assertTrue(SCIMSourceUser.objects.filter(source=self.source, id=ext_id).exists()) + self.assertTrue( + SCIMSourceUser.objects.filter(source=self.source, external_id=ext_id).exists() + ) self.assertTrue( Event.objects.filter( action=EventAction.MODEL_CREATED, user__username=self.source.token.user.username @@ -174,14 +177,16 @@ class TestSCIMUsers(APITestCase): ) self.assertEqual(response.status_code, 201) self.assertEqual( - SCIMSourceUser.objects.get(source=self.source, id=ext_id).user.attributes["phone"], + SCIMSourceUser.objects.get(source=self.source, external_id=ext_id).user.attributes[ + "phone" + ], "0123456789", ) def test_user_update(self): """Test user update""" user = create_test_user() - existing = SCIMSourceUser.objects.create(source=self.source, user=user, id=uuid4()) + existing = SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) ext_id = generate_id() response = self.client.put( reverse( @@ -209,10 +214,51 @@ class TestSCIMUsers(APITestCase): ) self.assertEqual(response.status_code, 200) + def test_user_update_patch(self): + """Test user update (patch)""" + user = create_test_user() + existing = SCIMSourceUser.objects.create( + source=self.source, + user=user, + external_id=uuid4(), + attributes={ + "userName": generate_id(), + }, + ) + response = self.client.patch( + reverse( + "authentik_sources_scim:v2-users", + kwargs={ + "source_slug": self.source.slug, + "user_id": str(user.uuid), + }, + ), + data=dumps( + { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "Add", + "path": f"{SCIM_URN_USER_ENTERPRISE}:manager", + "value": "86b2ed3e-30cd-4881-bb58-c4e910821339", + } + ], + } + ), + content_type=SCIM_CONTENT_TYPE, + HTTP_AUTHORIZATION=f"Bearer {self.source.token.key}", + ) + self.assertEqual(response.status_code, 200) + existing.refresh_from_db() + self.assertEqual( + existing.attributes[SCIM_URN_USER_ENTERPRISE], + {"manager": {"value": "86b2ed3e-30cd-4881-bb58-c4e910821339"}}, + ) + def test_user_delete(self): """Test user delete""" user = create_test_user() - SCIMSourceUser.objects.create(source=self.source, user=user, id=uuid4()) + SCIMSourceUser.objects.create(source=self.source, user=user, external_id=uuid4()) response = self.client.delete( reverse( "authentik_sources_scim:v2-users", diff --git a/authentik/sources/scim/tests/test_users_patch.py b/authentik/sources/scim/tests/test_users_patch.py new file mode 100644 index 0000000000..c3cb024c90 --- /dev/null +++ b/authentik/sources/scim/tests/test_users_patch.py @@ -0,0 +1,488 @@ +from rest_framework.test import APITestCase + +from authentik.core.tests.utils import create_test_user +from authentik.lib.generators import generate_id +from authentik.sources.scim.constants import SCIM_URN_USER_ENTERPRISE +from authentik.sources.scim.models import SCIMSource, SCIMSourceUser +from authentik.sources.scim.patch.processor import SCIMPatchProcessor + + +class TestSCIMUsersPatch(APITestCase): + """Test SCIM User Patch""" + + def test_add(self): + req = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + {"op": "Add", "path": "name.givenName", "value": "aqwer"}, + {"op": "Add", "path": "name.familyName", "value": "qwerqqqq"}, + {"op": "Add", "path": "name.formatted", "value": "aqwer qwerqqqq"}, + ], + } + user = create_test_user() + source = SCIMSource.objects.create(slug=generate_id()) + connection = SCIMSourceUser.objects.create( + user=user, + id=generate_id(), + source=source, + attributes={ + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + updated = SCIMPatchProcessor().apply_patches(connection.attributes, req["Operations"]) + self.assertEqual( + updated, + { + "meta": {"resourceType": "User"}, + "active": True, + "name": { + "givenName": "aqwer", + "familyName": "qwerqqqq", + "formatted": "aqwer qwerqqqq", + }, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + + def test_add_no_path(self): + """Test add patch with no path set""" + req = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + {"op": "Add", "value": {"externalId": "aqwer"}}, + ], + } + user = create_test_user() + source = SCIMSource.objects.create(slug=generate_id()) + connection = SCIMSourceUser.objects.create( + user=user, + id=generate_id(), + source=source, + attributes={ + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "displayName": "Test MS", + }, + ) + updated = SCIMPatchProcessor().apply_patches(connection.attributes, req["Operations"]) + self.assertEqual( + updated, + { + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "aqwer", + "displayName": "Test MS", + }, + ) + + def test_replace(self): + req = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + {"op": "Replace", "path": "name", "value": {"givenName": "aqwer"}}, + ], + } + user = create_test_user() + source = SCIMSource.objects.create(slug=generate_id()) + connection = SCIMSourceUser.objects.create( + user=user, + id=generate_id(), + source=source, + attributes={ + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + updated = SCIMPatchProcessor().apply_patches(connection.attributes, req["Operations"]) + self.assertEqual( + updated, + { + "meta": {"resourceType": "User"}, + "active": True, + "name": { + "givenName": "aqwer", + }, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + + def test_replace_no_path(self): + """Test value replace with no path""" + req = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + {"op": "Replace", "value": {"externalId": "aqwer"}}, + ], + } + user = create_test_user() + source = SCIMSource.objects.create(slug=generate_id()) + connection = SCIMSourceUser.objects.create( + user=user, + id=generate_id(), + source=source, + attributes={ + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + updated = SCIMPatchProcessor().apply_patches(connection.attributes, req["Operations"]) + self.assertEqual( + updated, + { + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "aqwer", + "displayName": "Test MS", + }, + ) + + def test_remove(self): + req = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + {"op": "Remove", "path": "name", "value": {"givenName": "aqwer"}}, + ], + } + user = create_test_user() + source = SCIMSource.objects.create(slug=generate_id()) + connection = SCIMSourceUser.objects.create( + user=user, + id=generate_id(), + source=source, + attributes={ + "meta": {"resourceType": "User"}, + "active": True, + "name": { + "givenName": "aqwer", + }, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + updated = SCIMPatchProcessor().apply_patches(connection.attributes, req["Operations"]) + self.assertEqual( + updated, + { + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + + def test_large(self): + """Large amount of patch operations""" + req = { + "Operations": [ + { + "op": "replace", + "path": "emails[primary eq true].value", + "value": "dandre_kling@wintheiser.info", + }, + { + "op": "replace", + "path": "phoneNumbers[primary eq true].value", + "value": "72-634-1548", + }, + { + "op": "replace", + "path": "phoneNumbers[primary eq true].display", + "value": "72-634-1548", + }, + {"op": "replace", "path": "ims[primary eq true].value", "value": "GXSGJKWGHVVS"}, + {"op": "replace", "path": "ims[primary eq true].display", "value": "IMCHDKUQIPYB"}, + { + "op": "replace", + "path": "photos[primary eq true].display", + "value": "TWAWLHHSUNIV", + }, + { + "op": "replace", + "path": "addresses[primary eq true].formatted", + "value": "TMINZQAJQDCL", + }, + { + "op": "replace", + "path": "addresses[primary eq true].streetAddress", + "value": "081 Wisoky Key", + }, + { + "op": "replace", + "path": "addresses[primary eq true].locality", + "value": "DPFASBZRPMDP", + }, + { + "op": "replace", + "path": "addresses[primary eq true].region", + "value": "WHSTJSPIPTCF", + }, + { + "op": "replace", + "path": "addresses[primary eq true].postalCode", + "value": "ko28 1qa", + }, + {"op": "replace", "path": "addresses[primary eq true].country", "value": "Taiwan"}, + { + "op": "replace", + "path": "entitlements[primary eq true].value", + "value": "NGBJMUYZVVBX", + }, + {"op": "replace", "path": "roles[primary eq true].value", "value": "XEELVFMMWCVM"}, + { + "op": "replace", + "path": "x509Certificates[primary eq true].value", + "value": "UYISMEDOXUZY", + }, + { + "op": "replace", + "value": { + "externalId": "7faaefb0-0774-4d8e-8f6d-863c361bc72c", + "name.formatted": "Dell", + "name.familyName": "Gay", + "name.givenName": "Kyler", + "name.middleName": "Hannah", + "name.honorificPrefix": "Cassie", + "name.honorificSuffix": "Yolanda", + "displayName": "DPRLIJSFQMTL", + "nickName": "BKSPMIRMFBTI", + "title": "NBZCOAXVYJUY", + "userType": "ZGJMYZRUORZE", + "preferredLanguage": "as-IN", + "locale": "JLOJHLPWZODG", + "timezone": "America/Argentina/Rio_Gallegos", + "active": True, + f"{SCIM_URN_USER_ENTERPRISE}:employeeNumber": "PDFWRRZBQOHB", + f"{SCIM_URN_USER_ENTERPRISE}:costCenter": "HACMZWSEDOTQ", + f"{SCIM_URN_USER_ENTERPRISE}:organization": "LXVHJUOLNCLS", + f"{SCIM_URN_USER_ENTERPRISE}:division": "JASVTPKPBPMG", + f"{SCIM_URN_USER_ENTERPRISE}:department": "GMSBFLMNPABY", + }, + }, + ], + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + } + user = create_test_user() + source = SCIMSource.objects.create(slug=generate_id()) + connection = SCIMSourceUser.objects.create( + user=user, + id=generate_id(), + source=source, + attributes={ + "active": True, + "addresses": [ + { + "primary": "true", + "formatted": "BLJMCNXHYLZK", + "streetAddress": "7801 Jacobs Fork", + "locality": "HZJBJWFAKXDD", + "region": "GJXCXPMIIKWK", + "postalCode": "pv82 8ua", + "country": "India", + } + ], + "displayName": "KEFXCHKHAFOT", + "emails": [{"primary": "true", "value": "scot@zemlak.uk"}], + "entitlements": [{"primary": "true", "value": "FTTUXWYDAAQC"}], + "externalId": "448d2786-7bf6-4e03-a4ef-64cbaf162fa7", + "ims": [{"primary": "true", "value": "IGWZUUMCMKXS", "display": "PJVGMMKYYHRU"}], + "locale": "PJNYJHWJILTI", + "name": { + "formatted": "Ladarius", + "familyName": "Manley", + "givenName": "Mazie", + "middleName": "Vernon", + "honorificPrefix": "Melyssa", + "honorificSuffix": "Demarcus", + }, + "nickName": "HTPKOXMWZKHL", + "phoneNumbers": [ + {"primary": "true", "value": "50-608-7660", "display": "50-608-7660"} + ], + "photos": [{"primary": "true", "display": "KCONLNLSYTBP"}], + "preferredLanguage": "wae", + "profileUrl": "HPSEOIPXMGOH", + "roles": [{"primary": "true", "value": "TLGYITOIZGKP"}], + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "timezone": "America/Indiana/Petersburg", + "title": "EJWFXLHNHMCD", + SCIM_URN_USER_ENTERPRISE: { + "employeeNumber": "XHDMEJUURJNR", + "costCenter": "RXUYBXOTRCZH", + "organization": "CEXWXMBRYAHN", + "division": "XMPFMDCLRKCW", + "department": "BKMNJVMCJUYS", + "manager": "PNGSGXLYVWMV", + }, + "userName": "imelda.auer@kshlerin.co.uk", + "userType": "PZFXORVSUAPU", + "x509Certificates": [{"primary": "true", "value": "KOVKWGIVVEHH"}], + }, + ) + updated = SCIMPatchProcessor().apply_patches(connection.attributes, req["Operations"]) + self.assertEqual( + updated, + { + "active": True, + "addresses": [ + { + "primary": "true", + "formatted": "BLJMCNXHYLZK", + "streetAddress": "7801 Jacobs Fork", + "locality": "HZJBJWFAKXDD", + "region": "GJXCXPMIIKWK", + "postalCode": "pv82 8ua", + "country": "India", + } + ], + "displayName": "DPRLIJSFQMTL", + "emails": [{"primary": "true", "value": "scot@zemlak.uk"}], + "entitlements": [{"primary": "true", "value": "FTTUXWYDAAQC"}], + "externalId": "7faaefb0-0774-4d8e-8f6d-863c361bc72c", + "ims": [{"primary": "true", "value": "IGWZUUMCMKXS", "display": "PJVGMMKYYHRU"}], + "locale": "JLOJHLPWZODG", + "name": { + "formatted": "Dell", + "familyName": "Gay", + "givenName": "Kyler", + "middleName": "Hannah", + "honorificPrefix": "Cassie", + "honorificSuffix": "Yolanda", + }, + "nickName": "BKSPMIRMFBTI", + "phoneNumbers": [ + {"primary": "true", "value": "50-608-7660", "display": "50-608-7660"} + ], + "photos": [{"primary": "true", "display": "KCONLNLSYTBP"}], + "preferredLanguage": "as-IN", + "profileUrl": "HPSEOIPXMGOH", + "roles": [{"primary": "true", "value": "TLGYITOIZGKP"}], + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "timezone": "America/Argentina/Rio_Gallegos", + "title": "NBZCOAXVYJUY", + SCIM_URN_USER_ENTERPRISE: { + "employeeNumber": "PDFWRRZBQOHB", + "costCenter": "HACMZWSEDOTQ", + "organization": "LXVHJUOLNCLS", + "division": "JASVTPKPBPMG", + "department": "GMSBFLMNPABY", + "manager": "PNGSGXLYVWMV", + }, + "userName": "imelda.auer@kshlerin.co.uk", + "userType": "ZGJMYZRUORZE", + "x509Certificates": [{"primary": "true", "value": "KOVKWGIVVEHH"}], + }, + ) + + def test_schema_urn_manager(self): + req = { + "schemas": ["urn:ietf:params:scim:api:messages:2.0:PatchOp"], + "Operations": [ + { + "op": "Add", + "value": { + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User:manager": "foo" + }, + }, + ], + } + user = create_test_user() + source = SCIMSource.objects.create(slug=generate_id()) + connection = SCIMSourceUser.objects.create( + user=user, + id=generate_id(), + source=source, + attributes={ + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + }, + ) + updated = SCIMPatchProcessor().apply_patches(connection.attributes, req["Operations"]) + self.assertEqual( + updated, + { + "meta": {"resourceType": "User"}, + "active": True, + "schemas": [ + "urn:ietf:params:scim:schemas:core:2.0:User", + SCIM_URN_USER_ENTERPRISE, + ], + "userName": "test@t.goauthentik.io", + "externalId": "test", + "displayName": "Test MS", + "urn:ietf:params:scim:schemas:extension:enterprise:2.0:User": { + "manager": {"value": "foo"} + }, + }, + ) diff --git a/authentik/sources/scim/views/v2/base.py b/authentik/sources/scim/views/v2/base.py index 9f17f78a6f..94d13dceae 100644 --- a/authentik/sources/scim/views/v2/base.py +++ b/authentik/sources/scim/views/v2/base.py @@ -1,6 +1,7 @@ """SCIM Utils""" from typing import Any +from uuid import UUID from django.conf import settings from django.core.paginator import Page, Paginator @@ -21,6 +22,7 @@ from authentik.core.sources.mapper import SourceMapper from authentik.lib.sync.mapper import PropertyMappingManager from authentik.sources.scim.models import SCIMSource from authentik.sources.scim.views.v2.auth import SCIMTokenAuth +from authentik.sources.scim.views.v2.exceptions import SCIMNotFoundError SCIM_CONTENT_TYPE = "application/scim+json" @@ -54,6 +56,13 @@ class SCIMView(APIView): def get_authenticators(self): return [SCIMTokenAuth(self)] + def remove_excluded_attributes(self, data: dict): + """Remove attributes specified in excludedAttributes""" + excluded: str = self.request.query_params.get("excludedAttributes", "") + for key in excluded.split(","): + data.pop(key.strip(), None) + return data + def filter_parse(self, request: Request): """Parse the path of a Patch Operation""" path = request.query_params.get("filter") @@ -103,6 +112,12 @@ class SCIMObjectView(SCIMView): # a source attribute before self.mapper = SourceMapper(self.source) self.manager = self.mapper.get_manager(self.model, ["data"]) + for key, value in kwargs.items(): + if key.endswith("_id"): + try: + UUID(value) + except ValueError: + raise SCIMNotFoundError("Invalid ID") from None def build_object_properties(self, data: dict[str, Any]) -> dict[str, Any | dict[str, Any]]: return self.mapper.build_object_properties( diff --git a/authentik/sources/scim/views/v2/groups.py b/authentik/sources/scim/views/v2/groups.py index 15b443286e..401b11ec9b 100644 --- a/authentik/sources/scim/views/v2/groups.py +++ b/authentik/sources/scim/views/v2/groups.py @@ -17,6 +17,7 @@ from authentik.core.models import Group, User from authentik.providers.scim.clients.schema import SCIM_GROUP_SCHEMA, PatchOp, PatchOperation from authentik.providers.scim.clients.schema import Group as SCIMGroupModel from authentik.sources.scim.models import SCIMSourceGroup +from authentik.sources.scim.patch.processor import SCIMPatchProcessor from authentik.sources.scim.views.v2.base import SCIMObjectView from authentik.sources.scim.views.v2.exceptions import ( SCIMConflictError, @@ -35,11 +36,12 @@ class GroupsView(SCIMObjectView): payload = SCIMGroupModel( schemas=[SCIM_GROUP_SCHEMA], id=str(scim_group.group.pk), - externalId=scim_group.id, + externalId=scim_group.external_id, displayName=scim_group.group.name, members=[], meta={ "resourceType": "Group", + "lastModified": scim_group.last_update, "location": self.request.build_absolute_uri( reverse( "authentik_sources_scim:v2-groups", @@ -54,7 +56,11 @@ class GroupsView(SCIMObjectView): for member in scim_group.group.users.order_by("pk"): member: User payload.members.append(GroupMember(value=str(member.uuid))) - return payload.model_dump(mode="json", exclude_unset=True) + final_payload = payload.model_dump(mode="json", exclude_unset=True) + final_payload.update(scim_group.attributes) + return self.remove_excluded_attributes( + SCIMGroupModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True) + ) def get(self, request: Request, group_id: str | None = None, **kwargs) -> Response: """List Group handler""" @@ -81,7 +87,7 @@ class GroupsView(SCIMObjectView): ) @atomic - def update_group(self, connection: SCIMSourceGroup | None, data: QueryDict): + def update_group(self, connection: SCIMSourceGroup | None, data: QueryDict, apply_members=True): """Partial update a group""" properties = self.build_object_properties(data) @@ -94,7 +100,7 @@ class GroupsView(SCIMObjectView): group.update_attributes(properties) - if "members" in data: + if "members" in data and apply_members: query = Q() for _member in data.get("members", []): try: @@ -105,14 +111,18 @@ class GroupsView(SCIMObjectView): query |= Q(uuid=member.value) if query: group.users.set(User.objects.filter(query)) + data["members"] = self._convert_members(group) if not connection: - connection, _ = SCIMSourceGroup.objects.get_or_create( + connection, _ = SCIMSourceGroup.objects.update_or_create( + external_id=data.get("externalId") or str(uuid4()), source=self.source, group=group, - attributes=data, - id=data.get("externalId") or str(uuid4()), + defaults={ + "attributes": data, + }, ) else: + connection.external_id = data.get("externalId", connection.external_id) connection.attributes = data connection.save() return connection @@ -139,6 +149,12 @@ class GroupsView(SCIMObjectView): connection = self.update_group(connection, request.data) return Response(self.group_to_scim(connection), status=200) + def _convert_members(self, group: Group): + users = [] + for user in group.users.all().order_by("uuid"): + users.append({"value": str(user.uuid)}) + return sorted(users, key=lambda u: u["value"]) + @atomic def patch(self, request: Request, group_id: str, **kwargs) -> Response: """Patch group handler""" @@ -171,6 +187,13 @@ class GroupsView(SCIMObjectView): query |= Q(uuid=member["value"]) if query: connection.group.users.remove(*User.objects.filter(query)) + patcher = SCIMPatchProcessor() + patched_data = patcher.apply_patches( + connection.attributes, request.data.get("Operations", []) + ) + patched_data["members"] = self._convert_members(connection.group) + if patched_data != connection.attributes: + self.update_group(connection, patched_data, apply_members=False) return Response(self.group_to_scim(connection), status=200) @atomic diff --git a/authentik/sources/scim/views/v2/service_provider_config.py b/authentik/sources/scim/views/v2/service_provider_config.py index d3dfd623f6..3b2c720979 100644 --- a/authentik/sources/scim/views/v2/service_provider_config.py +++ b/authentik/sources/scim/views/v2/service_provider_config.py @@ -33,9 +33,7 @@ class ServiceProviderConfigView(SCIMView): { "schemas": ["urn:ietf:params:scim:schemas:core:2.0:ServiceProviderConfig"], "authenticationSchemes": auth_schemas, - # We only support patch for groups currently, so don't broadly advertise it. - # Implementations that require Group patch will use it regardless of this flag. - "patch": {"supported": False}, + "patch": {"supported": True}, "bulk": {"supported": False, "maxOperations": 0, "maxPayloadSize": 0}, "filter": { "supported": True, diff --git a/authentik/sources/scim/views/v2/users.py b/authentik/sources/scim/views/v2/users.py index d29bc611bb..c9c4066e92 100644 --- a/authentik/sources/scim/views/v2/users.py +++ b/authentik/sources/scim/views/v2/users.py @@ -15,6 +15,7 @@ from authentik.core.models import User from authentik.providers.scim.clients.schema import SCIM_USER_SCHEMA from authentik.providers.scim.clients.schema import User as SCIMUserModel from authentik.sources.scim.models import SCIMSourceUser +from authentik.sources.scim.patch.processor import SCIMPatchProcessor from authentik.sources.scim.views.v2.base import SCIMObjectView from authentik.sources.scim.views.v2.exceptions import SCIMConflictError, SCIMNotFoundError @@ -29,7 +30,7 @@ class UsersView(SCIMObjectView): payload = SCIMUserModel( schemas=[SCIM_USER_SCHEMA], id=str(scim_user.user.uuid), - externalId=scim_user.id, + externalId=scim_user.external_id, userName=scim_user.user.username, name=Name( formatted=scim_user.user.name, @@ -44,8 +45,7 @@ class UsersView(SCIMObjectView): meta={ "resourceType": "User", "created": scim_user.user.date_joined, - # TODO: use events to find last edit? - "lastModified": scim_user.user.date_joined, + "lastModified": scim_user.last_update, "location": self.request.build_absolute_uri( reverse( "authentik_sources_scim:v2-users", @@ -59,7 +59,9 @@ class UsersView(SCIMObjectView): ) final_payload = payload.model_dump(mode="json", exclude_unset=True) final_payload.update(scim_user.attributes) - return final_payload + return self.remove_excluded_attributes( + SCIMUserModel.model_validate(final_payload).model_dump(mode="json", exclude_unset=True) + ) def get(self, request: Request, user_id: str | None = None, **kwargs) -> Response: """List User handler""" @@ -101,13 +103,16 @@ class UsersView(SCIMObjectView): user.update_attributes(properties) if not connection: - connection, _ = SCIMSourceUser.objects.get_or_create( + connection, _ = SCIMSourceUser.objects.update_or_create( + external_id=data.get("externalId") or str(uuid4()), source=self.source, user=user, - attributes=data, - id=data.get("externalId") or str(uuid4()), + defaults={ + "attributes": data, + }, ) else: + connection.external_id = data.get("externalId", connection.external_id) connection.attributes = data connection.save() return connection @@ -127,6 +132,18 @@ class UsersView(SCIMObjectView): connection = self.update_user(None, request.data) return Response(self.user_to_scim(connection), status=201) + def patch(self, request: Request, user_id: str, **kwargs): + connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() + if not connection: + raise SCIMNotFoundError("User not found.") + patcher = SCIMPatchProcessor() + patched_data = patcher.apply_patches( + connection.attributes, request.data.get("Operations", []) + ) + if patched_data != connection.attributes: + self.update_user(connection, patched_data) + return Response(self.user_to_scim(connection), status=200) + def put(self, request: Request, user_id: str, **kwargs) -> Response: """Update user handler""" connection = SCIMSourceUser.objects.filter(source=self.source, user__uuid=user_id).first() diff --git a/schema.yml b/schema.yml index a9c7009a27..5f46c114d9 100644 --- a/schema.yml +++ b/schema.yml @@ -55232,6 +55232,9 @@ components: id: type: string minLength: 1 + external_id: + type: string + minLength: 1 group: type: string format: uuid @@ -55296,6 +55299,9 @@ components: id: type: string minLength: 1 + external_id: + type: string + minLength: 1 user: type: integer source: @@ -58946,6 +58952,8 @@ components: properties: id: type: string + external_id: + type: string group: type: string format: uuid @@ -58960,6 +58968,7 @@ components: type: object additionalProperties: {} required: + - external_id - group - group_obj - id @@ -58971,6 +58980,9 @@ components: id: type: string minLength: 1 + external_id: + type: string + minLength: 1 group: type: string format: uuid @@ -58981,6 +58993,7 @@ components: type: object additionalProperties: {} required: + - external_id - group - id - source @@ -59089,6 +59102,8 @@ components: properties: id: type: string + external_id: + type: string user: type: integer user_obj: @@ -59102,6 +59117,7 @@ components: type: object additionalProperties: {} required: + - external_id - id - source - user @@ -59113,6 +59129,9 @@ components: id: type: string minLength: 1 + external_id: + type: string + minLength: 1 user: type: integer source: @@ -59122,6 +59141,7 @@ components: type: object additionalProperties: {} required: + - external_id - id - source - user diff --git a/web/src/admin/sources/scim/SCIMSourceGroups.ts b/web/src/admin/sources/scim/SCIMSourceGroups.ts index ac01837fd8..2f9d4dfc25 100644 --- a/web/src/admin/sources/scim/SCIMSourceGroups.ts +++ b/web/src/admin/sources/scim/SCIMSourceGroups.ts @@ -42,7 +42,7 @@ export class SCIMSourceGroupList extends Table { html`
${item.groupObj.name}
`, - html`${item.id}`, + html`${item.externalId}`, ]; } } diff --git a/web/src/admin/sources/scim/SCIMSourceUsers.ts b/web/src/admin/sources/scim/SCIMSourceUsers.ts index 65109560d7..8adcb9942d 100644 --- a/web/src/admin/sources/scim/SCIMSourceUsers.ts +++ b/web/src/admin/sources/scim/SCIMSourceUsers.ts @@ -43,7 +43,7 @@ export class SCIMSourceUserList extends Table {
${item.userObj.username}
${item.userObj.name} `, - html`${item.id}`, + html`${item.externalId}`, ]; } }