diff --git a/packages/ak-guardian/guardian/shortcuts.py b/packages/ak-guardian/guardian/shortcuts.py index b8ceb55a05..14cf18bef2 100644 --- a/packages/ak-guardian/guardian/shortcuts.py +++ b/packages/ak-guardian/guardian/shortcuts.py @@ -1,27 +1,18 @@ """Convenient shortcuts to manage or check object permissions.""" -from functools import lru_cache, partial +from functools import lru_cache from typing import Any, TypeVar from django.contrib.auth.models import Permission from django.contrib.contenttypes.models import ContentType -from django.db import connection from django.db.models import ( - AutoField, - BigIntegerField, - CharField, Count, ForeignKey, - IntegerField, Model, - PositiveIntegerField, - PositiveSmallIntegerField, QuerySet, - SmallIntegerField, UUIDField, ) -from django.db.models.expressions import Value -from django.db.models.functions import Cast, Replace +from django.db.models.expressions import RawSQL from guardian.core import ObjectPermissionChecker from guardian.ctypes import get_content_type @@ -295,42 +286,33 @@ def get_objects_for_user( # noqa: PLR0912 PLR0915 .filter(object_pk_count__gte=len(codenames)) ) - # object_pk is a varchar, while the queryset's pk is probably an integer or a uuid, so we cast - handle_pk_field = _handle_pk_field(queryset) - if handle_pk_field is not None: - perms_queryset = perms_queryset.annotate(obj_pk=handle_pk_field(expression=pk_field)) - pk_field = "obj_pk" - - return queryset.filter(pk__in=perms_queryset.values_list(pk_field, flat=True)) - - -def _handle_pk_field(queryset): + # pk is either UUID or an integer type, while object_pk is a varchar pk = queryset.model._meta.pk - if isinstance(pk, ForeignKey): - return _handle_pk_field(pk.target_field) + def _cast_type(pk): + if isinstance(pk, ForeignKey): + return _cast_type(pk.target_field) + if isinstance(pk, UUIDField): + return "uuid" + return "bigint" - if isinstance( # noqa: UP038 - pk, - ( - IntegerField, - AutoField, - BigIntegerField, - PositiveIntegerField, - PositiveSmallIntegerField, - SmallIntegerField, - ), - ): - return partial(Cast, output_field=BigIntegerField()) + cast_type = _cast_type(pk) - if isinstance(pk, UUIDField): - if connection.features.has_native_uuid_field: - return partial(Cast, output_field=UUIDField()) - return partial( - Replace, - text=Value("-"), - replacement=Value(""), - output_field=CharField(), - ) - - return None + perms_queryset = perms_queryset.values_list(pk_field, flat=True) + # The raw subquery is done to ensure that casting only takes place after the WHERE clause of + # `perms_queryset` is ran. Otherwise, the query planner may decide to cast every `object_pk`, + # which breaks (for example) if it tries to cast an integer to a UUID. In such a case, the WHERE + # of `perms_queryset` will remove any integer. + # However, the subquery might get optimized out by the query planner, which would cause the same + # cast issue as before. To prevent the subquery from being collapsed in the query below, we add + # OFFSET 0. + perms_subquery_sql, perms_subquery_params = perms_queryset.query.sql_with_params() + subquery = RawSQL( + f""" + SELECT ("permission_subquery"."{pk_field}")::{cast_type} as "object_pk" + FROM ({perms_subquery_sql}) "permission_subquery" + OFFSET 0 + """, # nosec + perms_subquery_params, + ) + return queryset.filter(pk__in=subquery)