mirror of
https://github.com/goauthentik/authentik.git
synced 2026-06-17 19:09:11 +03:00
packages/ak-guardian: cast safely (#18929)
* packages/ak-guardian: cast safely * use `regexp_like` instead of `pg_input_is_valid` * alternative approach: RawSQL subquery * remove extra fields we don't need Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * prevent subquery collapse Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * take into account foreignkeys Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * shut up bandit Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> * clean up a bit Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> --------- Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space> Co-authored-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
@@ -1,27 +1,18 @@
|
||||
"""Convenient shortcuts to manage or check object permissions."""
|
||||
|
||||
from functools import lru_cache, partial
|
||||
from functools import lru_cache
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from django.contrib.auth.models import Permission
|
||||
from django.contrib.contenttypes.models import ContentType
|
||||
from django.db import connection
|
||||
from django.db.models import (
|
||||
AutoField,
|
||||
BigIntegerField,
|
||||
CharField,
|
||||
Count,
|
||||
ForeignKey,
|
||||
IntegerField,
|
||||
Model,
|
||||
PositiveIntegerField,
|
||||
PositiveSmallIntegerField,
|
||||
QuerySet,
|
||||
SmallIntegerField,
|
||||
UUIDField,
|
||||
)
|
||||
from django.db.models.expressions import Value
|
||||
from django.db.models.functions import Cast, Replace
|
||||
from django.db.models.expressions import RawSQL
|
||||
|
||||
from guardian.core import ObjectPermissionChecker
|
||||
from guardian.ctypes import get_content_type
|
||||
@@ -295,42 +286,33 @@ def get_objects_for_user( # noqa: PLR0912 PLR0915
|
||||
.filter(object_pk_count__gte=len(codenames))
|
||||
)
|
||||
|
||||
# object_pk is a varchar, while the queryset's pk is probably an integer or a uuid, so we cast
|
||||
handle_pk_field = _handle_pk_field(queryset)
|
||||
if handle_pk_field is not None:
|
||||
perms_queryset = perms_queryset.annotate(obj_pk=handle_pk_field(expression=pk_field))
|
||||
pk_field = "obj_pk"
|
||||
|
||||
return queryset.filter(pk__in=perms_queryset.values_list(pk_field, flat=True))
|
||||
|
||||
|
||||
def _handle_pk_field(queryset):
|
||||
# pk is either UUID or an integer type, while object_pk is a varchar
|
||||
pk = queryset.model._meta.pk
|
||||
|
||||
if isinstance(pk, ForeignKey):
|
||||
return _handle_pk_field(pk.target_field)
|
||||
def _cast_type(pk):
|
||||
if isinstance(pk, ForeignKey):
|
||||
return _cast_type(pk.target_field)
|
||||
if isinstance(pk, UUIDField):
|
||||
return "uuid"
|
||||
return "bigint"
|
||||
|
||||
if isinstance( # noqa: UP038
|
||||
pk,
|
||||
(
|
||||
IntegerField,
|
||||
AutoField,
|
||||
BigIntegerField,
|
||||
PositiveIntegerField,
|
||||
PositiveSmallIntegerField,
|
||||
SmallIntegerField,
|
||||
),
|
||||
):
|
||||
return partial(Cast, output_field=BigIntegerField())
|
||||
cast_type = _cast_type(pk)
|
||||
|
||||
if isinstance(pk, UUIDField):
|
||||
if connection.features.has_native_uuid_field:
|
||||
return partial(Cast, output_field=UUIDField())
|
||||
return partial(
|
||||
Replace,
|
||||
text=Value("-"),
|
||||
replacement=Value(""),
|
||||
output_field=CharField(),
|
||||
)
|
||||
|
||||
return None
|
||||
perms_queryset = perms_queryset.values_list(pk_field, flat=True)
|
||||
# The raw subquery is done to ensure that casting only takes place after the WHERE clause of
|
||||
# `perms_queryset` is ran. Otherwise, the query planner may decide to cast every `object_pk`,
|
||||
# which breaks (for example) if it tries to cast an integer to a UUID. In such a case, the WHERE
|
||||
# of `perms_queryset` will remove any integer.
|
||||
# However, the subquery might get optimized out by the query planner, which would cause the same
|
||||
# cast issue as before. To prevent the subquery from being collapsed in the query below, we add
|
||||
# OFFSET 0.
|
||||
perms_subquery_sql, perms_subquery_params = perms_queryset.query.sql_with_params()
|
||||
subquery = RawSQL(
|
||||
f"""
|
||||
SELECT ("permission_subquery"."{pk_field}")::{cast_type} as "object_pk"
|
||||
FROM ({perms_subquery_sql}) "permission_subquery"
|
||||
OFFSET 0
|
||||
""", # nosec
|
||||
perms_subquery_params,
|
||||
)
|
||||
return queryset.filter(pk__in=subquery)
|
||||
|
||||
Reference in New Issue
Block a user