diff --git a/authentik/outposts/models.py b/authentik/outposts/models.py index 31cc465a4b..086cb507ce 100644 --- a/authentik/outposts/models.py +++ b/authentik/outposts/models.py @@ -86,7 +86,7 @@ class OutpostConfig: class OutpostModel(Model): """Base model for providers that need more objects than just themselves""" - def get_required_objects(self) -> Iterable[models.Model | str]: + def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]: """Return a list of all required objects""" return [self] @@ -332,41 +332,35 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel): """Create per-object and global permissions for outpost service-account""" # To ensure the user only has the correct permissions, we delete all of them and re-add # the ones the user needs - with transaction.atomic(): - user.remove_all_perms_from_managed_role() - for model_or_perm in self.get_required_objects(): - if isinstance(model_or_perm, models.Model): - model_or_perm: models.Model - code_name = ( - f"{model_or_perm._meta.app_label}.view_{model_or_perm._meta.model_name}" - ) - try: - user.assign_perms_to_managed_role(code_name, model_or_perm) - except (Permission.DoesNotExist, AttributeError) as exc: - LOGGER.warning( - "permission doesn't exist", - code_name=code_name, - user=user, - model=model_or_perm, + try: + with transaction.atomic(): + user.remove_all_perms_from_managed_role() + for model_or_perm in self.get_required_objects(): + if isinstance(model_or_perm, models.Model): + code_name = ( + f"{model_or_perm._meta.app_label}.view_{model_or_perm._meta.model_name}" ) - Event.new( - action=EventAction.SYSTEM_EXCEPTION, - message=( - "While setting the permissions for the service-account, a " - "permission was not found: Check " - "https://docs.goauthentik.io/troubleshooting/missing_permission" - ), - ).with_exception(exc).set_user(user).save() - else: - app_label, perm = model_or_perm.split(".") - permission = Permission.objects.filter( - codename=perm, - content_type__app_label=app_label, - ) - if not permission.exists(): - LOGGER.warning("permission doesn't exist", perm=model_or_perm) - continue - user.assign_perms_to_managed_role(permission.first()) + user.assign_perms_to_managed_role(code_name, model_or_perm) + elif isinstance(model_or_perm, tuple): + perm, obj = model_or_perm + user.assign_perms_to_managed_role(perm, obj) + else: + user.assign_perms_to_managed_role(model_or_perm) + except (Permission.DoesNotExist, AttributeError) as exc: + LOGGER.warning( + "permission doesn't exist", + code_name=code_name, + user=user, + model=model_or_perm, + ) + Event.new( + action=EventAction.SYSTEM_EXCEPTION, + message=( + "While setting the permissions for the service-account, a " + "permission was not found: Check " + "https://docs.goauthentik.io/troubleshooting/missing_permission" + ), + ).with_exception(exc).set_user(user).save() LOGGER.debug( "Updated service account's permissions", obj_perms=user.get_all_obj_perms_on_managed_role(), @@ -431,7 +425,7 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel): Token.objects.filter(identifier=self.token_identifier).delete() return self.token - def get_required_objects(self) -> Iterable[models.Model | str]: + def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]: """Get an iterator of all objects the user needs read access to""" objects: list[models.Model | str] = [ self, @@ -445,7 +439,9 @@ class Outpost(ScheduledModel, SerializerModel, ManagedModel): if self.managed: for brand in Brand.objects.filter(web_certificate__isnull=False): objects.append(brand) - objects.append(brand.web_certificate) + objects.append(("view_certificatekeypair", brand.web_certificate)) + objects.append(("view_certificatekeypair_certificate", brand.web_certificate)) + objects.append(("view_certificatekeypair_key", brand.web_certificate)) return objects def __str__(self) -> str: diff --git a/authentik/outposts/tests/test_sa.py b/authentik/outposts/tests/test_sa.py index 164804e822..8f1008f811 100644 --- a/authentik/outposts/tests/test_sa.py +++ b/authentik/outposts/tests/test_sa.py @@ -51,10 +51,12 @@ class OutpostTests(TestCase): permissions = outpost.user.get_all_obj_perms_on_managed_role().order_by( "content_type__model" ) - self.assertEqual(len(permissions), 3) + self.assertEqual(len(permissions), 5) self.assertEqual(permissions[0].object_pk, str(keypair.pk)) - self.assertEqual(permissions[1].object_pk, str(outpost.pk)) - self.assertEqual(permissions[2].object_pk, str(provider.pk)) + self.assertEqual(permissions[1].object_pk, str(keypair.pk)) + self.assertEqual(permissions[2].object_pk, str(keypair.pk)) + self.assertEqual(permissions[3].object_pk, str(outpost.pk)) + self.assertEqual(permissions[4].object_pk, str(provider.pk)) # Remove provider from outpost, user should only have access to outpost outpost.providers.remove(provider) diff --git a/authentik/providers/ldap/models.py b/authentik/providers/ldap/models.py index e6cd97b0de..59b4ae2762 100644 --- a/authentik/providers/ldap/models.py +++ b/authentik/providers/ldap/models.py @@ -93,11 +93,13 @@ class LDAPProvider(OutpostModel, BackchannelProvider): def __str__(self): return f"LDAP Provider {self.name}" - def get_required_objects(self) -> Iterable[models.Model | str]: - required_models = [self, "authentik_core.view_user", "authentik_core.view_group"] + def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]: + required = [self, "authentik_core.view_user", "authentik_core.view_group"] if self.certificate is not None: - required_models.append(self.certificate) - return required_models + required.append(("view_certificatekeypair", self.certificate)) + required.append(("view_certificatekeypair_certificate", self.certificate)) + required.append(("view_certificatekeypair_key", self.certificate)) + return required class Meta: verbose_name = _("LDAP Provider") diff --git a/authentik/providers/proxy/models.py b/authentik/providers/proxy/models.py index 980b9a314b..682dd2e04c 100644 --- a/authentik/providers/proxy/models.py +++ b/authentik/providers/proxy/models.py @@ -179,11 +179,13 @@ class ProxyProvider(OutpostModel, OAuth2Provider): def __str__(self): return f"Proxy Provider {self.name}" - def get_required_objects(self) -> Iterable[models.Model | str]: - required_models = [self] + def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]: + required = [self] if self.certificate is not None: - required_models.append(self.certificate) - return required_models + required.append(("view_certificatekeypair", self.certificate)) + required.append(("view_certificatekeypair_certificate", self.certificate)) + required.append(("view_certificatekeypair_key", self.certificate)) + return required class Meta: verbose_name = _("Proxy Provider") diff --git a/authentik/providers/proxy/tests.py b/authentik/providers/proxy/tests.py index 08b0a27e16..25d555f34e 100644 --- a/authentik/providers/proxy/tests.py +++ b/authentik/providers/proxy/tests.py @@ -1,10 +1,14 @@ """proxy provider tests""" +from json import loads + from django.urls import reverse from rest_framework.test import APITestCase -from authentik.core.tests.utils import create_test_admin_user, create_test_flow +from authentik.core.models import Application +from authentik.core.tests.utils import create_test_admin_user, create_test_cert, create_test_flow from authentik.lib.generators import generate_id +from authentik.outposts.models import Outpost, OutpostType from authentik.providers.oauth2.models import ClientTypes from authentik.providers.proxy.models import ProxyMode, ProxyProvider @@ -127,3 +131,55 @@ class ProxyProviderTests(APITestCase): self.assertEqual(response.status_code, 200) provider: ProxyProvider = ProxyProvider.objects.get(name=name) self.assertEqual(provider.client_type, ClientTypes.CONFIDENTIAL) + + def test_sa_fetch(self): + """Test fetching the outpost config as the service account""" + outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) + provider = ProxyProvider.objects.create(name=generate_id()) + Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) + outpost.providers.add(provider) + + res = self.client.get( + reverse("authentik_api:proxyprovideroutpost-list"), + HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", + ) + body = loads(res.content) + self.assertEqual(body["pagination"]["count"], 1) + + def test_sa_perms_cert(self): + """Test permissions to access a configured certificate""" + cert = create_test_cert() + outpost = Outpost.objects.create(name=generate_id(), type=OutpostType.PROXY) + provider = ProxyProvider.objects.create(name=generate_id(), certificate=cert) + Application.objects.create(name=generate_id(), slug=generate_id(), provider=provider) + outpost.providers.add(provider) + + res = self.client.get( + reverse("authentik_api:proxyprovideroutpost-list"), + HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", + ) + body = loads(res.content) + self.assertEqual(body["pagination"]["count"], 1) + cert_id = body["results"][0]["certificate"] + self.assertEqual(cert_id, str(cert.pk)) + + res = self.client.get( + reverse( + "authentik_api:certificatekeypair-view-certificate", + kwargs={ + "pk": cert_id, + }, + ), + HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", + ) + self.assertEqual(res.status_code, 200) + # res = self.client.get( + # reverse( + # "authentik_api:certificatekeypair-view-private-key", + # kwargs={ + # "pk": cert_id, + # }, + # ), + # HTTP_AUTHORIZATION=f"Bearer {outpost.token.key}", + # ) + # self.assertEqual(res.status_code, 200) diff --git a/authentik/providers/radius/models.py b/authentik/providers/radius/models.py index 1955da9f7d..ed53c3c48a 100644 --- a/authentik/providers/radius/models.py +++ b/authentik/providers/radius/models.py @@ -64,10 +64,12 @@ class RadiusProvider(OutpostModel, Provider): return RadiusProviderSerializer - def get_required_objects(self) -> Iterable[models.Model | str]: + def get_required_objects(self) -> Iterable[models.Model | str | tuple[str, models.Model]]: required = [self, "authentik_stages_mtls.pass_outpost_certificate"] if self.certificate is not None: - required.append(self.certificate) + required.append(("view_certificatekeypair", self.certificate)) + required.append(("view_certificatekeypair_certificate", self.certificate)) + required.append(("view_certificatekeypair_key", self.certificate)) return required def __str__(self):