outposts: fix permission errors for related certificates (cherry-pick #18861 to version-2025.12) (#18866)

Co-authored-by: Jens L. <jens@goauthentik.io>
fix permission errors for related certificates (#18861)
This commit is contained in:
authentik-automation[bot]
2025-12-16 15:23:49 +01:00
committed by GitHub
parent 5ea85f086a
commit ac45f80551
6 changed files with 111 additions and 51 deletions
+33 -37
View File
@@ -86,7 +86,7 @@ class OutpostConfig:
class OutpostModel(Model):
"""Base model for providers that need more objects than just themselves"""
def get_required_objects(self) -> Iterable[models.Model | str]:
def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]:
"""Return a list of all required objects"""
return [self]
@@ -332,41 +332,35 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel):
"""Create per-object and global permissions for outpost service-account"""
# To ensure the user only has the correct permissions, we delete all of them and re-add
# the ones the user needs
with transaction.atomic():
user.remove_all_perms_from_managed_role()
for model_or_perm in self.get_required_objects():
if isinstance(model_or_perm, models.Model):
model_or_perm: models.Model
code_name = (
f"{model_or_perm._meta.app_label}.view_{model_or_perm._meta.model_name}"
)
try:
user.assign_perms_to_managed_role(code_name, model_or_perm)
except (Permission.DoesNotExist, AttributeError) as exc:
LOGGER.warning(
"permission doesn't exist",
code_name=code_name,
user=user,
model=model_or_perm,
try:
with transaction.atomic():
user.remove_all_perms_from_managed_role()
for model_or_perm in self.get_required_objects():
if isinstance(model_or_perm, models.Model):
code_name = (
f"{model_or_perm._meta.app_label}.view_{model_or_perm._meta.model_name}"
)
Event.new(
action=EventAction.SYSTEM_EXCEPTION,
message=(
"While setting the permissions for the service-account, a "
"permission was not found: Check "
"https://docs.goauthentik.io/troubleshooting/missing_permission"
),
).with_exception(exc).set_user(user).save()
else:
app_label, perm = model_or_perm.split(".")
permission = Permission.objects.filter(
codename=perm,
content_type__app_label=app_label,
)
if not permission.exists():
LOGGER.warning("permission doesn't exist", perm=model_or_perm)
continue
user.assign_perms_to_managed_role(permission.first())
user.assign_perms_to_managed_role(code_name, model_or_perm)
elif isinstance(model_or_perm, tuple):
perm, obj = model_or_perm
user.assign_perms_to_managed_role(perm, obj)
else:
user.assign_perms_to_managed_role(model_or_perm)
except (Permission.DoesNotExist, AttributeError) as exc:
LOGGER.warning(
"permission doesn't exist",
code_name=code_name,
user=user,
model=model_or_perm,
)
Event.new(
action=EventAction.SYSTEM_EXCEPTION,
message=(
"While setting the permissions for the service-account, a "
"permission was not found: Check "
"https://docs.goauthentik.io/troubleshooting/missing_permission"
),
).with_exception(exc).set_user(user).save()
LOGGER.debug(
"Updated service account's permissions",
obj_perms=user.get_all_obj_perms_on_managed_role(),
@@ -431,7 +425,7 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel):
Token.objects.filter(identifier=self.token_identifier).delete()
return self.token
def get_required_objects(self) -> Iterable[models.Model | str]:
def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]:
"""Get an iterator of all objects the user needs read access to"""
objects: list[models.Model | str] = [
self,
@@ -445,7 +439,9 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel):
if self.managed:
for brand in Brand.objects.filter(web_certificate__isnull=False):
objects.append(brand)
objects.append(brand.web_certificate)
objects.append(("view_certificatekeypair", brand.web_certificate))
objects.append(("view_certificatekeypair_certificate", brand.web_certificate))
objects.append(("view_certificatekeypair_key", brand.web_certificate))
return objects
def __str__(self) -> str:
+5 -3
View File
@@ -51,10 +51,12 @@ class OutpostTests(TestCase):
permissions = outpost.user.get_all_obj_perms_on_managed_role().order_by(
"content_type__model"
)
self.assertEqual(len(permissions), 3)
self.assertEqual(len(permissions), 5)
self.assertEqual(permissions[0].object_pk, str(keypair.pk))
self.assertEqual(permissions[1].object_pk, str(outpost.pk))
self.assertEqual(permissions[2].object_pk, str(provider.pk))
self.assertEqual(permissions[1].object_pk, str(keypair.pk))
self.assertEqual(permissions[2].object_pk, str(keypair.pk))
self.assertEqual(permissions[3].object_pk, str(outpost.pk))
self.assertEqual(permissions[4].object_pk, str(provider.pk))
# Remove provider from outpost, user should only have access to outpost
outpost.providers.remove(provider)
+6 -4
View File
@@ -93,11 +93,13 @@ class LDAPProvider(OutpostModel, BackchannelProvider):
def __str__(self):
return f"LDAP Provider {self.name}"
def get_required_objects(self) -> Iterable[models.Model | str]:
required_models = [self, "authentik_core.view_user", "authentik_core.view_group"]
def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]:
required = [self, "authentik_core.view_user", "authentik_core.view_group"]
if self.certificate is not None:
required_models.append(self.certificate)
return required_models
required.append(("view_certificatekeypair", self.certificate))
required.append(("view_certificatekeypair_certificate", self.certificate))
required.append(("view_certificatekeypair_key", self.certificate))
return required
class Meta:
verbose_name = _("LDAP Provider")
+6 -4
View File
@@ -179,11 +179,13 @@ class ProxyProvider(OutpostModel, OAuth2Provider):
def __str__(self):
return f"Proxy Provider {self.name}"
def get_required_objects(self) -> Iterable[models.Model | str]:
required_models = [self]
def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]:
required = [self]
if self.certificate is not None:
required_models.append(self.certificate)
return required_models
required.append(("view_certificatekeypair", self.certificate))
required.append(("view_certificatekeypair_certificate", self.certificate))
required.append(("view_certificatekeypair_key", self.certificate))
return required
class Meta:
verbose_name = _("Proxy Provider")
+57 -1
View File
@@ -1,10 +1,14 @@
"""proxy provider tests"""
from json import loads
from django.urls import reverse
from rest_framework.test import APITestCase
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.core.models import Application
from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow
from authentik.lib.generators import generate_id
from authentik.outposts.models import Outpost, OutpostType
from authentik.providers.oauth2.models import ClientTypes
from authentik.providers.proxy.models import ProxyMode, ProxyProvider
@@ -127,3 +131,55 @@ class ProxyProviderTests(APITestCase):
self.assertEqual(response.status_code, 200)
provider: ProxyProvider = ProxyProvider.objects.get(name=name)
self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL)
def test_sa_fetch(self):
"""Test fetching the outpost config as the service account"""
outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
provider = ProxyProvider.objects.create(name=generate_id())
Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
outpost.providers.add(provider)
res = self.client.get(
reverse("authentik_api:proxyprovideroutpost-list"),
HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
)
body = loads(res.content)
self.assertEqual(body["pagination"]["count"], 1)
def test_sa_perms_cert(self):
"""Test permissions to access a configured certificate"""
cert = create_test_cert()
outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY)
provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert)
Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider)
outpost.providers.add(provider)
res = self.client.get(
reverse("authentik_api:proxyprovideroutpost-list"),
HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
)
body = loads(res.content)
self.assertEqual(body["pagination"]["count"], 1)
cert_id = body["results"][0]["certificate"]
self.assertEqual(cert_id, str(cert.pk))
res = self.client.get(
reverse(
"authentik_api:certificatekeypair-view-certificate",
kwargs={
"pk": cert_id,
},
),
HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
)
self.assertEqual(res.status_code, 200)
# res = self.client.get(
# reverse(
# "authentik_api:certificatekeypair-view-private-key",
# kwargs={
# "pk": cert_id,
# },
# ),
# HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}",
# )
# self.assertEqual(res.status_code, 200)
+4 -2
View File
@@ -64,10 +64,12 @@ class RadiusProvider(OutpostModel, Provider):
return RadiusProviderSerializer
def get_required_objects(self) -> Iterable[models.Model | str]:
def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]:
required = [self, "authentik_stages_mtls.pass_outpost_certificate"]
if self.certificate is not None:
required.append(self.certificate)
required.append(("view_certificatekeypair", self.certificate))
required.append(("view_certificatekeypair_certificate", self.certificate))
required.append(("view_certificatekeypair_key", self.certificate))
return required
def __str__(self):