mirror of
https://github.com/goauthentik/authentik.git
synced 2026-06-17 19:09:11 +03:00
05005f4eb9
* Add meta_hide field to hide apps * exclude hidden applications from user dashboard * Add the hide option to the UI * Add schema * Add hide setting to application wizard * Add typescript client changes * fix linting * Convert blank://blank to meta_hide=True in the migration * fix tests * update docs * fix continuous login Signed-off-by: Jens Langhammer <jens@goauthentik.io> * Apply suggestions from code review Co-authored-by: Dewi Roberts <dewi@goauthentik.io> Signed-off-by: Marcelo Elizeche Landó <marce@melizeche.com> * fix linting * fix migrations * Apply suggestions from code review Co-authored-by: Dominic R <dominic@sdko.org> Signed-off-by: Marcelo Elizeche Landó <marce@melizeche.com> * rename all mentions of dashboard to My applications * generate schema * generate TS client --------- Signed-off-by: Jens Langhammer <jens@goauthentik.io> Signed-off-by: Marcelo Elizeche Landó <marce@melizeche.com> Co-authored-by: Jens Langhammer <jens@goauthentik.io> Co-authored-by: Dewi Roberts <dewi@goauthentik.io> Co-authored-by: Dominic R <dominic@sdko.org>
342 lines
13 KiB
Python
342 lines
13 KiB
Python
"""Application API Views"""
|
|
|
|
from collections.abc import Iterator
|
|
from copy import copy
|
|
|
|
from django.core.cache import cache
|
|
from django.db.models import Case, QuerySet
|
|
from django.db.models.expressions import When
|
|
from django.shortcuts import get_object_or_404
|
|
from django.utils.translation import gettext as _
|
|
from drf_spectacular.types import OpenApiTypes
|
|
from drf_spectacular.utils import OpenApiParameter, extend_schema
|
|
from guardian.shortcuts import get_objects_for_user
|
|
from rest_framework.decorators import action
|
|
from rest_framework.exceptions import ValidationError
|
|
from rest_framework.fields import CharField, ReadOnlyField, SerializerMethodField
|
|
from rest_framework.request import Request
|
|
from rest_framework.response import Response
|
|
from rest_framework.viewsets import ModelViewSet
|
|
from structlog.stdlib import get_logger
|
|
|
|
from authentik.api.pagination import Pagination
|
|
from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
|
|
from authentik.core.api.providers import ProviderSerializer
|
|
from authentik.core.api.used_by import UsedByMixin
|
|
from authentik.core.api.users import UserSerializer
|
|
from authentik.core.api.utils import ModelSerializer, ThemedUrlsSerializer
|
|
from authentik.core.apps import AppAccessWithoutBindings
|
|
from authentik.core.models import Application, User
|
|
from authentik.events.logs import LogEventSerializer, capture_logs
|
|
from authentik.policies.api.exec import PolicyTestResultSerializer
|
|
from authentik.policies.engine import PolicyEngine
|
|
from authentik.policies.types import CACHE_PREFIX, PolicyResult
|
|
from authentik.rbac.filters import ObjectFilter
|
|
|
|
LOGGER = get_logger()
|
|
|
|
|
|
def user_app_cache_key(
|
|
user_pk: str, page_number: int | None = None, only_with_launch_url: bool = False
|
|
) -> str:
|
|
"""Cache key where application list for user is saved"""
|
|
key = f"{CACHE_PREFIX}app_access/{user_pk}"
|
|
if only_with_launch_url:
|
|
key += "/launch"
|
|
if page_number:
|
|
key += f"/{page_number}"
|
|
return key
|
|
|
|
|
|
class ApplicationSerializer(ModelSerializer):
|
|
"""Application Serializer"""
|
|
|
|
launch_url = SerializerMethodField()
|
|
provider_obj = ProviderSerializer(
|
|
source="get_provider",
|
|
required=False,
|
|
read_only=True,
|
|
allow_null=True,
|
|
)
|
|
backchannel_providers_obj = ProviderSerializer(
|
|
source="backchannel_providers", required=False, read_only=True, many=True
|
|
)
|
|
|
|
meta_icon_url = ReadOnlyField(source="get_meta_icon")
|
|
meta_icon_themed_urls = ThemedUrlsSerializer(
|
|
source="get_meta_icon_themed_urls", read_only=True, allow_null=True
|
|
)
|
|
|
|
def get_launch_url(self, app: Application) -> str | None:
|
|
"""Allow formatting of launch URL"""
|
|
user = None
|
|
user_data = None
|
|
|
|
if "request" in self.context:
|
|
user = self.context["request"].user
|
|
|
|
# Cache serialized user data to avoid N+1 when formatting launch URLs
|
|
# for multiple applications. UserSerializer accesses user.groups which
|
|
# would otherwise trigger a query for each application.
|
|
if user is not None:
|
|
if "_cached_user_data" not in self.context:
|
|
# Prefetch groups to avoid N+1
|
|
self.context["_cached_user_data"] = UserSerializer(instance=user).data
|
|
user_data = self.context["_cached_user_data"]
|
|
|
|
return app.get_launch_url(user, user_data=user_data)
|
|
|
|
def validate_slug(self, slug: str) -> str:
|
|
if slug in Application.reserved_slugs:
|
|
raise ValidationError(
|
|
_("The slug '{slug}' is reserved and cannot be used for applications.").format(
|
|
slug=slug
|
|
)
|
|
)
|
|
return slug
|
|
|
|
def __init__(self, *args, **kwargs) -> None:
|
|
super().__init__(*args, **kwargs)
|
|
if SERIALIZER_CONTEXT_BLUEPRINT in self.context:
|
|
self.fields["icon"] = CharField(source="meta_icon", required=False)
|
|
|
|
class Meta:
|
|
model = Application
|
|
fields = [
|
|
"pk",
|
|
"name",
|
|
"slug",
|
|
"provider",
|
|
"provider_obj",
|
|
"backchannel_providers",
|
|
"backchannel_providers_obj",
|
|
"launch_url",
|
|
"open_in_new_tab",
|
|
"meta_launch_url",
|
|
"meta_icon",
|
|
"meta_icon_url",
|
|
"meta_icon_themed_urls",
|
|
"meta_description",
|
|
"meta_publisher",
|
|
"policy_engine_mode",
|
|
"group",
|
|
"meta_hide",
|
|
]
|
|
extra_kwargs = {
|
|
"backchannel_providers": {"required": False},
|
|
}
|
|
|
|
|
|
class ApplicationViewSet(UsedByMixin, ModelViewSet):
|
|
"""Application Viewset"""
|
|
|
|
queryset = (
|
|
Application.objects.all()
|
|
.with_provider()
|
|
.prefetch_related("policies")
|
|
.prefetch_related("backchannel_providers")
|
|
)
|
|
serializer_class = ApplicationSerializer
|
|
search_fields = [
|
|
"name",
|
|
"slug",
|
|
"meta_launch_url",
|
|
"meta_description",
|
|
"meta_publisher",
|
|
"group",
|
|
]
|
|
filterset_fields = [
|
|
"name",
|
|
"slug",
|
|
"meta_launch_url",
|
|
"meta_description",
|
|
"meta_publisher",
|
|
"group",
|
|
]
|
|
lookup_field = "slug"
|
|
ordering = ["name"]
|
|
|
|
def _filter_queryset_for_list(self, queryset: QuerySet) -> QuerySet:
|
|
"""Custom filter_queryset method which ignores guardian, but still supports sorting"""
|
|
for backend in list(self.filter_backends):
|
|
if backend == ObjectFilter:
|
|
continue
|
|
queryset = backend().filter_queryset(self.request, queryset, self)
|
|
return queryset
|
|
|
|
def _get_allowed_applications(
|
|
self, paginated_apps: Iterator[Application], user: User | None = None
|
|
) -> list[Application]:
|
|
applications = []
|
|
request = self.request._request
|
|
if user:
|
|
request = copy(request)
|
|
request.user = user
|
|
for application in paginated_apps:
|
|
engine = PolicyEngine(application, request.user, request)
|
|
engine.empty_result = AppAccessWithoutBindings.get()
|
|
engine.build()
|
|
if engine.passing:
|
|
applications.append(application)
|
|
return applications
|
|
|
|
def _expand_applications(self, applications: list[Application]) -> QuerySet[Application]:
|
|
"""
|
|
Re-fetch with proper prefetching for serialization
|
|
Cached applications don't have prefetched relationships, causing N+1 queries
|
|
during serialization when get_provider() is called
|
|
"""
|
|
if not applications:
|
|
return self.get_queryset().none()
|
|
pks = [app.pk for app in applications]
|
|
return (
|
|
self.get_queryset()
|
|
.filter(pk__in=pks)
|
|
.order_by(Case(*[When(pk=pk, then=pos) for pos, pk in enumerate(pks)]))
|
|
)
|
|
|
|
def _filter_applications_with_launch_url(
|
|
self, paginated_apps: QuerySet[Application]
|
|
) -> list[Application]:
|
|
applications = []
|
|
for app in paginated_apps:
|
|
if app.get_launch_url():
|
|
applications.append(app)
|
|
return applications
|
|
|
|
@extend_schema(
|
|
parameters=[
|
|
OpenApiParameter(
|
|
name="for_user",
|
|
location=OpenApiParameter.QUERY,
|
|
type=OpenApiTypes.INT,
|
|
)
|
|
],
|
|
responses={
|
|
200: PolicyTestResultSerializer(),
|
|
},
|
|
)
|
|
@action(detail=True, methods=["GET"])
|
|
def check_access(self, request: Request, slug: str) -> Response:
|
|
"""Check access to a single application by slug"""
|
|
# Don't use self.get_object as that checks for view_application permission
|
|
# which the user might not have, even if they have access
|
|
application = get_object_or_404(Application, slug=slug)
|
|
# If the current user is superuser, they can set `for_user`
|
|
for_user = request.user
|
|
if request.user.is_superuser and "for_user" in request.query_params:
|
|
try:
|
|
for_user = User.objects.filter(pk=request.query_params.get("for_user")).first()
|
|
except ValueError:
|
|
raise ValidationError({"for_user": "for_user must be numerical"}) from None
|
|
if not for_user:
|
|
raise ValidationError({"for_user": "User not found"})
|
|
engine = PolicyEngine(application, for_user, request)
|
|
engine.empty_result = AppAccessWithoutBindings.get()
|
|
engine.use_cache = False
|
|
with capture_logs() as logs:
|
|
engine.build()
|
|
result = engine.result
|
|
response = PolicyTestResultSerializer(PolicyResult(False))
|
|
if result.passing:
|
|
response = PolicyTestResultSerializer(PolicyResult(True))
|
|
if request.user.is_superuser:
|
|
log_messages = []
|
|
for log in logs:
|
|
if log.attributes.get("process", "") == "PolicyProcess":
|
|
continue
|
|
log_messages.append(LogEventSerializer(log).data)
|
|
result.log_messages = log_messages
|
|
response = PolicyTestResultSerializer(result)
|
|
return Response(response.data)
|
|
|
|
@extend_schema(
|
|
parameters=[
|
|
OpenApiParameter(
|
|
name="superuser_full_list",
|
|
location=OpenApiParameter.QUERY,
|
|
type=OpenApiTypes.BOOL,
|
|
),
|
|
OpenApiParameter(
|
|
name="for_user",
|
|
location=OpenApiParameter.QUERY,
|
|
type=OpenApiTypes.INT,
|
|
),
|
|
OpenApiParameter(
|
|
name="only_with_launch_url",
|
|
location=OpenApiParameter.QUERY,
|
|
type=OpenApiTypes.BOOL,
|
|
),
|
|
]
|
|
)
|
|
def list(self, request: Request) -> Response:
|
|
"""Custom list method that checks Policy based access instead of guardian"""
|
|
should_cache = request.query_params.get("search", "") == ""
|
|
|
|
superuser_full_list = (
|
|
str(request.query_params.get("superuser_full_list", "false")).lower() == "true"
|
|
)
|
|
if superuser_full_list and request.user.is_superuser:
|
|
return super().list(request)
|
|
|
|
only_with_launch_url = (
|
|
str(request.query_params.get("only_with_launch_url", "false")).lower()
|
|
) == "true"
|
|
|
|
queryset = self._filter_queryset_for_list(self.get_queryset())
|
|
queryset = queryset.exclude(meta_hide=True)
|
|
if only_with_launch_url:
|
|
# Pre-filter at DB level to skip expensive per-app policy evaluation
|
|
# for apps that can never appear in the launcher (no meta_launch_url
|
|
# and no provider, so no possible launch URL).
|
|
queryset = queryset.exclude(meta_launch_url="", provider__isnull=True)
|
|
paginator: Pagination = self.paginator
|
|
paginated_apps = paginator.paginate_queryset(queryset, request)
|
|
|
|
if "for_user" in request.query_params:
|
|
try:
|
|
for_user: int = int(request.query_params.get("for_user", 0))
|
|
for_user = (
|
|
get_objects_for_user(request.user, "authentik_core.view_user_applications")
|
|
.filter(pk=for_user)
|
|
.first()
|
|
)
|
|
if not for_user:
|
|
raise ValidationError({"for_user": "User not found"})
|
|
except ValueError as exc:
|
|
raise ValidationError from exc
|
|
allowed_applications = self._get_allowed_applications(paginated_apps, user=for_user)
|
|
|
|
serializer = self.get_serializer(allowed_applications, many=True)
|
|
return self.get_paginated_response(serializer.data)
|
|
|
|
allowed_applications = []
|
|
if not should_cache:
|
|
allowed_applications = self._get_allowed_applications(paginated_apps)
|
|
if should_cache:
|
|
allowed_applications = cache.get(
|
|
user_app_cache_key(
|
|
self.request.user.pk, paginator.page.number, only_with_launch_url
|
|
)
|
|
)
|
|
if allowed_applications:
|
|
# Re-fetch cached applications since pickled instances lose prefetched
|
|
# relationships, causing N+1 queries during serialization
|
|
allowed_applications = self._expand_applications(allowed_applications)
|
|
else:
|
|
LOGGER.debug("Caching allowed application list", page=paginator.page.number)
|
|
allowed_applications = self._get_allowed_applications(paginated_apps)
|
|
cache.set(
|
|
user_app_cache_key(
|
|
self.request.user.pk, paginator.page.number, only_with_launch_url
|
|
),
|
|
allowed_applications,
|
|
timeout=86400,
|
|
)
|
|
|
|
if only_with_launch_url:
|
|
allowed_applications = self._filter_applications_with_launch_url(allowed_applications)
|
|
|
|
serializer = self.get_serializer(allowed_applications, many=True)
|
|
return self.get_paginated_response(serializer.data)
|