replace request.tenant with system settings

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2026-06-17 17:52:16 +02:00
parent 43de4151bc
commit 4ec6f6b28e
15 changed files with 35 additions and 370 deletions
+2
View File
@@ -3,6 +3,7 @@
from django.urls import path
from authentik.admin.api.meta import AppsViewSet, ModelViewSet
from authentik.admin.api.settings import SettingsView
from authentik.admin.api.system import SystemView
from authentik.admin.api.version import VersionView
from authentik.admin.api.version_history import VersionHistoryViewSet
@@ -13,4 +14,5 @@ api_urlpatterns = [
path("admin/version/", VersionView.as_view(), name="admin_version"),
("admin/version/history", VersionHistoryViewSet, "version_history"),
path("admin/system/", SystemView.as_view(), name="admin_system"),
path("admin/settings/", SettingsView.as_view(), name="admin_settings"),
]
+5
View File
@@ -0,0 +1,5 @@
from authentik.admin.models import SystemSettings
def get_system_settings() -> SystemSettings:
return SystemSettings.objects.get(pk=True)
+5 -2
View File
@@ -6,6 +6,7 @@ from drf_spectacular.plumbing import build_object_type
from rest_framework import pagination
from rest_framework.response import Response
from authentik.admin.utils import get_system_settings
from authentik.api.search.ql import QLSearch
from authentik.api.v3.schema.pagination import PAGINATION
from authentik.api.v3.schema.search import AUTOCOMPLETE_SCHEMA
@@ -25,8 +26,10 @@ class Pagination(pagination.PageNumberPagination):
if self.page_size_query_param in request.query_params:
page_size = super().get_page_size(request)
if page_size is not None:
return min(super().get_page_size(request), request.tenant.pagination_max_page_size)
return request.tenant.pagination_default_page_size
return min(
super().get_page_size(request), get_system_settings().pagination_max_page_size
)
return get_system_settings().pagination_default_page_size
def get_paginated_response(self, data) -> Response:
previous_page_number = 0
+2 -1
View File
@@ -20,6 +20,7 @@ from rest_framework.views import APIView
from authentik.admin.files.manager import get_file_manager
from authentik.admin.files.usage import FileUsage
from authentik.admin.utils import get_system_settings
from authentik.core.api.utils import PassiveSerializer
from authentik.events.context_processors.base import get_context_processors
from authentik.lib.config import CONFIG
@@ -76,7 +77,7 @@ class ConfigView(APIView):
for processor in get_context_processors():
if cap := processor.capability():
caps.append(cap)
if request.tenant.impersonation:
if get_system_settings().impersonation:
caps.append(Capabilities.CAN_IMPERSONATE)
if settings.DEBUG: # pragma: no cover
caps.append(Capabilities.CAN_DEBUG)
+5 -10
View File
@@ -49,19 +49,14 @@ from rest_framework.fields import (
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import (
ListSerializer,
PrimaryKeyRelatedField,
)
from rest_framework.serializers import ListSerializer, PrimaryKeyRelatedField
from rest_framework.validators import UniqueValidator
from rest_framework.viewsets import ModelViewSet
from structlog.stdlib import get_logger
from authentik.admin.utils import get_system_settings
from authentik.api.authentication import TokenAuthentication
from authentik.api.search.fields import (
ChoiceSearchField,
JSONSearchField,
)
from authentik.api.search.fields import ChoiceSearchField, JSONSearchField
from authentik.api.validation import validate
from authentik.blueprints.v1.importer import SERIALIZER_CONTEXT_BLUEPRINT
from authentik.brands.models import Brand
@@ -960,7 +955,7 @@ class UserViewSet(
@action(detail=True, methods=["POST"], permission_classes=[IsAuthenticated])
def impersonate(self, request: Request, pk: int) -> Response:
"""Impersonate a user"""
if not request.tenant.impersonation:
if not get_system_settings().impersonation:
LOGGER.debug("User attempted to impersonate", user=request.user)
return Response(status=401)
user_to_be = self.get_object()
@@ -977,7 +972,7 @@ class UserViewSet(
if user_to_be.pk == self.request.user.pk:
LOGGER.debug("User attempted to impersonate themselves", user=request.user)
return Response(status=401)
if not reason and request.tenant.impersonation_require_reason:
if not reason and get_system_settings().impersonation_require_reason:
LOGGER.debug(
"User attempted to impersonate without providing a reason",
user=request.user,
+4 -11
View File
@@ -13,9 +13,9 @@ from lxml import etree # nosec
from lxml.etree import Element, SubElement, _Element # nosec
from requests.exceptions import ConnectionError, HTTPError, RequestException, Timeout
from authentik.admin.utils import get_system_settings
from authentik.lib.utils.dict import get_path_from_dict
from authentik.lib.utils.http import get_http_session
from authentik.tenants.utils import get_current_tenant
if TYPE_CHECKING:
from authentik.core.models import User
@@ -60,9 +60,7 @@ def avatar_mode_gravatar(user: User, mode: str) -> str | None:
def generate_colors(text: str) -> tuple[str, str]:
"""Generate colors based on `text`"""
color = (
int(md5(text.lower().encode("utf-8"), usedforsecurity=False).hexdigest(), 16) % 0xFFFFFF
) # nosec
color = int(md5(text.lower().encode("utf-8"), usedforsecurity=False).hexdigest(), 16) % 0xFFFFFF # nosec
# Get a (somewhat arbitrarily) reduced scope of colors
# to avoid too dark or light backgrounds
@@ -127,7 +125,7 @@ def generate_avatar_from_name(
text.attrib["x"] = "50%"
text.attrib["y"] = "50%"
text.attrib["style"] = (
f"color: #{text_hex}; " "line-height: 1; " f"font-family: {','.join(SVG_FONTS)}; "
f"color: #{text_hex}; line-height: 1; font-family: {','.join(SVG_FONTS)}; "
)
text.attrib["fill"] = f"#{text_hex}"
text.attrib["alignment-baseline"] = "middle"
@@ -204,12 +202,7 @@ def get_avatar(user: User, request: HttpRequest | None = None) -> str:
"initials": avatar_mode_generated,
"gravatar": avatar_mode_gravatar,
}
tenant = None
if request:
tenant = request.tenant
else:
tenant = get_current_tenant()
modes: str = tenant.avatars
modes: str = get_system_settings().avatars
for mode in modes.split(","):
avatar = None
if mode in mode_map:
+2 -1
View File
@@ -16,6 +16,7 @@ from scim2_filter_parser.transpilers.django_q_object import get_query
from structlog import BoundLogger
from structlog.stdlib import get_logger
from authentik.admin.utils import get_system_settings
from authentik.core.models import Group, User
from authentik.core.sources.mapper import SourceMapper
from authentik.lib.sync.mapper import PropertyMappingManager
@@ -85,7 +86,7 @@ class SCIMView(APIView):
)
def paginate_query(self, query: QuerySet) -> Page:
per_page = int(self.request.tenant.pagination_default_page_size)
per_page = int(get_system_settings().pagination_default_page_size)
start_index = 1
try:
start_index = int(self.request.query_params.get("startIndex", 1))
@@ -4,6 +4,7 @@ from django.conf import settings
from rest_framework.request import Request
from rest_framework.response import Response
from authentik.admin.utils import get_system_settings
from authentik.sources.scim.views.v2.base import SCIMView
@@ -37,7 +38,7 @@ class ServiceProviderConfigView(SCIMView):
"bulk": {"supported": False, "maxOperations": 0, "maxPayloadSize": 0},
"filter": {
"supported": True,
"maxResults": request.tenant.pagination_default_page_size,
"maxResults": get_system_settings().pagination_default_page_size,
},
"changePassword": {"supported": False},
"sort": {"supported": False},
View File
-42
View File
@@ -1,42 +0,0 @@
"""Serializer for tenants models"""
from django.apps import apps
from django.http import HttpResponseNotFound
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import ModelViewSet
from authentik.core.api.utils import ModelSerializer
from authentik.tenants.api.tenants import TenantApiKeyAuthentication
from authentik.tenants.models import Domain
class DomainSerializer(ModelSerializer):
"""Domain Serializer"""
class Meta:
model = Domain
fields = "__all__"
class DomainViewSet(ModelViewSet):
"""Domain ViewSet"""
queryset = Domain.objects.all()
serializer_class = DomainSerializer
search_fields = [
"domain",
"tenant__name",
"tenant__schema_name",
]
ordering = ["domain"]
authentication_classes = [TenantApiKeyAuthentication]
permission_classes = [IsAuthenticated]
filter_backends = [OrderingFilter, SearchFilter]
filterset_fields = []
def dispatch(self, request, *args, **kwargs):
# This only checks the license in the default tenant, which is what we want
if not apps.get_app_config("authentik_enterprise").enabled():
return HttpResponseNotFound()
return super().dispatch(request, *args, **kwargs)
-125
View File
@@ -1,125 +0,0 @@
"""Serializer for tenants models"""
from typing import get_args
from django.utils.translation import gettext_lazy as _
from django_tenants.utils import get_public_schema_name
from drf_spectacular.extensions import OpenApiSerializerFieldExtension
from drf_spectacular.plumbing import build_basic_type, build_object_type
from rest_framework.exceptions import ValidationError
from rest_framework.fields import JSONField
from rest_framework.generics import RetrieveUpdateAPIView
from rest_framework.permissions import SAFE_METHODS
from authentik.core.api.utils import JSONDictField, ModelSerializer
from authentik.rbac.permissions import HasPermission
from authentik.tenants.flags import Flag
from authentik.tenants.models import Tenant
class FlagJSONField(JSONDictField):
def to_internal_value(self, data: str):
flags = super().to_internal_value(data)
for flag in Flag.available(visibility="system", exclude_system=False):
flags[flag().key] = flag.get()
return flags
def to_representation(self, value: dict) -> dict:
new_value = value.copy()
for flag in Flag.available(exclude_system=False):
_flag = flag()
# Exclude any system flags that aren't modifiable
if _flag.visibility == "system":
new_value.pop(_flag.key, None)
# Explicitly present unset flags as if they were set to default
if _flag.key not in value:
value[_flag.key] = _flag.default
return super().to_representation(new_value)
def run_validators(self, value: dict):
super().run_validators(value)
for flag in Flag.available():
_flag = flag()
if _flag.key not in value:
continue
flag_value = value.get(_flag.key)
flag_type = get_args(_flag.__orig_bases__[0])[0]
if flag_value and not isinstance(flag_value, flag_type):
raise ValidationError(
_("Value for flag {flag_key} needs to be of type {type}.").format(
flag_key=_flag.key, type=flag_type.__name__
)
)
class FlagsJSONExtension(OpenApiSerializerFieldExtension):
"""Generate API Schema for JSON fields as"""
target_class = "authentik.tenants.api.settings.FlagJSONField"
def map_serializer_field(self, auto_schema, direction):
props = {}
for flag in Flag.available():
_flag = flag()
props[_flag.key] = build_basic_type(get_args(_flag.__orig_bases__[0])[0])
if _flag.description:
props[_flag.key]["description"] = _flag.description
if _flag.deprecated:
props[_flag.key]["deprecated"] = _flag.deprecated
return build_object_type(props, required=props.keys())
class SettingsSerializer(ModelSerializer):
"""Settings Serializer"""
footer_links = JSONField(required=False)
flags = FlagJSONField()
class Meta:
model = Tenant
fields = [
"avatars",
"default_user_change_name",
"default_user_change_email",
"default_user_change_username",
"event_retention",
"reputation_lower_limit",
"reputation_upper_limit",
"footer_links",
"gdpr_compliance",
"impersonation",
"impersonation_require_reason",
"default_token_duration",
"default_token_length",
"pagination_default_page_size",
"pagination_max_page_size",
"flags",
]
class SettingsView(RetrieveUpdateAPIView):
"""Settings view"""
queryset = Tenant.objects.filter(ready=True)
serializer_class = SettingsSerializer
filter_backends = []
def get_permissions(self):
return [
HasPermission(
"authentik_rbac.view_system_settings"
if self.request.method in SAFE_METHODS
else "authentik_rbac.edit_system_settings"
)()
]
def get_object(self):
obj = self.request.tenant
self.check_object_permissions(self.request, obj)
return obj
def perform_update(self, serializer):
# We need to be in the public schema to actually modify a tenant
with Tenant.objects.get(schema_name=get_public_schema_name()):
super().perform_update(serializer)
-149
View File
@@ -1,149 +0,0 @@
"""Serializer for tenants models"""
from datetime import timedelta
from hmac import compare_digest
from django.apps import apps
from django.http import HttpResponseNotFound
from django.http.request import urljoin
from django.utils.timezone import now
from drf_spectacular.utils import OpenApiResponse, extend_schema
from rest_framework.authentication import BaseAuthentication, get_authorization_header
from rest_framework.decorators import action
from rest_framework.fields import CharField, IntegerField
from rest_framework.filters import OrderingFilter, SearchFilter
from rest_framework.permissions import IsAuthenticated
from rest_framework.request import Request
from rest_framework.response import Response
from rest_framework.serializers import DateTimeField
from rest_framework.viewsets import ModelViewSet
from authentik.api.authentication import IPCUser, validate_auth
from authentik.core.api.utils import ModelSerializer, PassiveSerializer
from authentik.core.models import User
from authentik.lib.config import CONFIG
from authentik.recovery.lib import create_admin_group, create_recovery_token
from authentik.tenants.models import Tenant
class TenantApiKeyAuthentication(BaseAuthentication):
"""Authentication based on tenants.api_key"""
def authenticate(self, request: Request) -> bool:
key = CONFIG.get("tenants.api_key", "")
if not key:
return None
token = validate_auth(get_authorization_header(request))
if token is None:
return None
if not compare_digest(token, key):
return None
return (IPCUser(), None)
class TenantSerializer(ModelSerializer):
"""Tenant Serializer"""
class Meta:
model = Tenant
fields = [
"tenant_uuid",
"schema_name",
"name",
"ready",
]
class TenantAdminGroupRequestSerializer(PassiveSerializer):
"""Tenant admin group creation request serializer"""
user = CharField()
class TenantRecoveryKeyRequestSerializer(PassiveSerializer):
"""Tenant recovery key creation request serializer"""
user = CharField()
duration_days = IntegerField(initial=365)
class TenantRecoveryKeyResponseSerializer(PassiveSerializer):
"""Tenant recovery key creation response serializer"""
expiry = DateTimeField()
url = CharField()
class TenantViewSet(ModelViewSet):
"""Tenant Viewset"""
queryset = Tenant.objects.all()
serializer_class = TenantSerializer
search_fields = [
"name",
"schema_name",
"domains__domain",
]
ordering = ["schema_name"]
authentication_classes = [TenantApiKeyAuthentication]
permission_classes = [IsAuthenticated]
filter_backends = [OrderingFilter, SearchFilter]
filterset_fields = []
def dispatch(self, request, *args, **kwargs):
# This only checks the license in the default tenant, which is what we want
if not apps.get_app_config("authentik_enterprise").enabled():
return HttpResponseNotFound()
return super().dispatch(request, *args, **kwargs)
@extend_schema(
request=TenantAdminGroupRequestSerializer(),
responses={
204: OpenApiResponse(description="Group created successfully."),
400: OpenApiResponse(description="Bad request"),
404: OpenApiResponse(description="User not found"),
},
)
@action(detail=True, pagination_class=None, methods=["POST"])
def create_admin_group(self, request: Request, pk: str) -> Response:
"""Create admin group and add user to it."""
tenant = self.get_object()
with tenant:
data = TenantAdminGroupRequestSerializer(data=request.data)
data.is_valid(raise_exception=True)
user = User.objects.filter(username=data.validated_data.get("user")).first()
if not user:
return Response(status=404)
_ = create_admin_group(user)
return Response(status=200)
@extend_schema(
request=TenantRecoveryKeyRequestSerializer(),
responses={
200: TenantRecoveryKeyResponseSerializer(),
400: OpenApiResponse(description="Bad request"),
404: OpenApiResponse(description="User not found"),
},
)
@action(detail=True, pagination_class=None, methods=["POST"])
def create_recovery_key(self, request: Request, pk: str) -> Response:
"""Create recovery key for user."""
tenant = self.get_object()
with tenant:
data = TenantRecoveryKeyRequestSerializer(data=request.data)
data.is_valid(raise_exception=True)
user = User.objects.filter(username=data.validated_data.get("user")).first()
if not user:
return Response(status=404)
expiry = now() + timedelta(days=data.validated_data.get("duration_days"))
token, url = create_recovery_token(user, expiry, "tenants API")
domain = tenant.get_primary_domain()
host = domain.domain if domain else request.get_host()
url = urljoin(f"{request.scheme}://{host}", url)
serializer = TenantRecoveryKeyResponseSerializer({"expiry": token.expires, "url": url})
return Response(serializer.data)
@@ -1,12 +1,11 @@
"""Test event retention"""
from django.test.client import RequestFactory
from django_tenants.utils import get_public_schema_name
from rest_framework.test import APITestCase
from authentik.admin.utils import get_system_settings
from authentik.events.models import Event, EventAction
from authentik.lib.utils.time import timedelta_from_string
from authentik.tenants.models import Tenant
class TestEventRetention(APITestCase):
@@ -14,12 +13,11 @@ class TestEventRetention(APITestCase):
def test_event_retention(self):
"""Test brand's event retention"""
default_tenant = Tenant.objects.get(schema_name=get_public_schema_name())
default_tenant.event_retention = "weeks=3"
default_tenant.save()
settings = get_system_settings()
settings.event_retention = "weeks=3"
settings.save()
factory = RequestFactory()
request = factory.get("/")
request.tenant = default_tenant
event = Event.new(action=EventAction.SYSTEM_EXCEPTION, message="test").from_http(request)
self.assertEqual(event.expires.day, (event.created + timedelta_from_string("weeks=3")).day)
self.assertEqual(
-19
View File
@@ -1,19 +0,0 @@
"""API URLs"""
from django.conf import settings
from django.urls import path
from authentik.lib.config import CONFIG
from authentik.tenants.api.domains import DomainViewSet
from authentik.tenants.api.settings import SettingsView
from authentik.tenants.api.tenants import TenantViewSet
api_urlpatterns = [
path("admin/settings/", SettingsView.as_view(), name="tenant_settings"),
]
if CONFIG.get_bool("tenants.enabled", True) or settings.TEST:
api_urlpatterns += [
("tenants/tenants", TenantViewSet),
("tenants/domains", DomainViewSet),
]
@@ -85,6 +85,7 @@ entries:
model: authentik_stages_prompt.prompt
- attrs:
expression: |
from authentik.admin.utils import get_system_settings
from authentik.core.models import (
USER_ATTRIBUTE_CHANGE_EMAIL,
USER_ATTRIBUTE_CHANGE_NAME,
@@ -93,21 +94,21 @@ entries:
prompt_data = request.context.get("prompt_data")
if not request.user.group_attributes(request.http_request).get(
USER_ATTRIBUTE_CHANGE_EMAIL, request.http_request.tenant.default_user_change_email
USER_ATTRIBUTE_CHANGE_EMAIL, get_system_settings().default_user_change_email
):
if prompt_data.get("email") != request.user.email:
ak_message("Not allowed to change email address.")
return False
if not request.user.group_attributes(request.http_request).get(
USER_ATTRIBUTE_CHANGE_NAME, request.http_request.tenant.default_user_change_name
USER_ATTRIBUTE_CHANGE_NAME, get_system_settings().default_user_change_name
):
if prompt_data.get("name") != request.user.name:
ak_message("Not allowed to change name.")
return False
if not request.user.group_attributes(request.http_request).get(
USER_ATTRIBUTE_CHANGE_USERNAME, request.http_request.tenant.default_user_change_username
USER_ATTRIBUTE_CHANGE_USERNAME, get_system_settings().default_user_change_username
):
if prompt_data.get("username") != request.user.username:
ak_message("Not allowed to change username.")