providers/oauth: make rp init logout oidc certification changes (#21815)

* providers/oauth: make rp init logout oidc certification changes

* update test

* slight rework

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix tests

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add oidc certification tests

* test

* fix backchannel url

* make urls uniform

* update to main

* remove env bind

* cleanup patch

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fixup

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* add traefik healthcheck

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

* fix healthcheck

Signed-off-by: Jens Langhammer <jens@goauthentik.io>

---------

Signed-off-by: Jens Langhammer <jens@goauthentik.io>
Co-authored-by: Jens Langhammer <jens@goauthentik.io>
This commit is contained in:
Connor Peshek
2026-05-04 12:11:59 -05:00
committed by GitHub
parent 88a545f4fb
commit a3b0180049
11 changed files with 228 additions and 42 deletions
+6
View File
@@ -286,6 +286,12 @@ jobs:
glob: tests/openid_conformance/test_oidc_basic.py
- name: oidc_implicit
glob: tests/openid_conformance/test_oidc_implicit.py
- name: oidc_rp-initiated
glob: tests/openid_conformance/test_oidc_rp_initiated.py
- name: oidc_frontchannel
glob: tests/openid_conformance/test_oidc_frontchannel.py
- name: oidc_backchannel
glob: tests/openid_conformance/test_oidc_backchannel.py
- name: ssf_transmitter
glob: tests/openid_conformance/test_ssf_transmitter.py
steps:
@@ -53,6 +53,16 @@ class TestEndSessionView(OAuthTestCase):
self.brand.flow_invalidation = self.invalidation_flow
self.brand.save()
def _id_token_hint(self, host: str) -> str:
"""Issue a valid id_token_hint for the test provider under the given host."""
return self.provider.encode(
{
"iss": f"http://{host}/application/o/{self.app.slug}/",
"aud": self.provider.client_id,
"sub": str(self.user.pk),
}
)
def test_post_logout_redirect_uri_strict_match(self):
"""Test strict URI matching redirects to flow"""
self.client.force_login(self.user)
@@ -61,7 +71,10 @@ class TestEndSessionView(OAuthTestCase):
"authentik_providers_oauth2:end-session",
kwargs={"application_slug": self.app.slug},
),
{"post_logout_redirect_uri": "http://testserver/logout"},
{
"post_logout_redirect_uri": "http://testserver/logout",
"id_token_hint": self._id_token_hint(self.brand.domain),
},
HTTP_HOST=self.brand.domain,
)
# Should redirect to the invalidation flow
@@ -69,7 +82,12 @@ class TestEndSessionView(OAuthTestCase):
self.assertIn(self.invalidation_flow.slug, response.url)
def test_post_logout_redirect_uri_strict_no_match(self):
"""Test strict URI not matching still proceeds with flow (no redirect URI in context)"""
"""Test strict URI not matching returns an error and does not start logout flow.
Required by OIDC RP-Initiated Logout 1.0: on an unregistered
post_logout_redirect_uri, the OP MUST NOT redirect and MUST NOT proceed with
logout that targets the RP.
"""
self.client.force_login(self.user)
invalid_uri = "http://testserver/other"
response = self.client.get(
@@ -77,12 +95,14 @@ class TestEndSessionView(OAuthTestCase):
"authentik_providers_oauth2:end-session",
kwargs={"application_slug": self.app.slug},
),
{"post_logout_redirect_uri": invalid_uri},
{
"post_logout_redirect_uri": invalid_uri,
"id_token_hint": self._id_token_hint(self.brand.domain),
},
HTTP_HOST=self.brand.domain,
)
# Should still redirect to flow, but invalid URI should not be in response
self.assertEqual(response.status_code, 302)
self.assertNotIn(invalid_uri, response.url)
self.assertEqual(response.status_code, 400)
self.assertNotIn(invalid_uri, response.content.decode())
def test_post_logout_redirect_uri_regex_match(self):
"""Test regex URI matching redirects to flow"""
@@ -92,7 +112,10 @@ class TestEndSessionView(OAuthTestCase):
"authentik_providers_oauth2:end-session",
kwargs={"application_slug": self.app.slug},
),
{"post_logout_redirect_uri": "https://app.example.com/logout"},
{
"post_logout_redirect_uri": "https://app.example.com/logout",
"id_token_hint": self._id_token_hint(self.brand.domain),
},
HTTP_HOST=self.brand.domain,
)
# Should redirect to the invalidation flow
@@ -100,7 +123,7 @@ class TestEndSessionView(OAuthTestCase):
self.assertIn(self.invalidation_flow.slug, response.url)
def test_post_logout_redirect_uri_regex_no_match(self):
"""Test regex URI not matching"""
"""Test regex URI not matching returns an error and does not start logout flow."""
self.client.force_login(self.user)
invalid_uri = "https://malicious.com/logout"
response = self.client.get(
@@ -108,12 +131,14 @@ class TestEndSessionView(OAuthTestCase):
"authentik_providers_oauth2:end-session",
kwargs={"application_slug": self.app.slug},
),
{"post_logout_redirect_uri": invalid_uri},
{
"post_logout_redirect_uri": invalid_uri,
"id_token_hint": self._id_token_hint(self.brand.domain),
},
HTTP_HOST=self.brand.domain,
)
# Should still proceed to flow, but invalid URI should not be in response
self.assertEqual(response.status_code, 302)
self.assertNotIn(invalid_uri, response.url)
self.assertEqual(response.status_code, 400)
self.assertNotIn(invalid_uri, response.content.decode())
def test_state_parameter_appended_to_uri(self):
"""Test state parameter is appended to validated redirect URI"""
@@ -123,6 +148,7 @@ class TestEndSessionView(OAuthTestCase):
{
"post_logout_redirect_uri": "http://testserver/logout",
"state": "test-state-123",
"id_token_hint": self._id_token_hint("testserver"),
},
)
request.user = self.user
@@ -132,6 +158,7 @@ class TestEndSessionView(OAuthTestCase):
view.request = request
view.kwargs = {"application_slug": self.app.slug}
view.resolve_provider_application()
view.validate()
self.assertIn("state=test-state-123", view.post_logout_redirect_uri)
@@ -146,6 +173,7 @@ class TestEndSessionView(OAuthTestCase):
{
"post_logout_redirect_uri": "http://testserver/logout",
"state": "xyz789",
"id_token_hint": self._id_token_hint(self.brand.domain),
},
HTTP_HOST=self.brand.domain,
)
+56 -23
View File
@@ -5,6 +5,8 @@ from urllib.parse import quote, urlparse
from django.http import Http404, HttpRequest, HttpResponse
from django.shortcuts import get_object_or_404
from jwt import PyJWTError
from jwt import decode as jwt_decode
from authentik.common.oauth.constants import (
FORBIDDEN_URI_SCHEMES,
@@ -21,11 +23,14 @@ from authentik.flows.planner import (
from authentik.flows.stage import SessionEndStage
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.views import bad_request_message
from authentik.policies.views import PolicyAccessView, RequestValidationError
from authentik.policies.views import PolicyAccessView
from authentik.providers.iframe_logout import IframeLogoutStageView
from authentik.providers.oauth2.errors import TokenError
from authentik.providers.oauth2.models import (
AccessToken,
JWTAlgorithms,
OAuth2LogoutMethod,
OAuth2Provider,
RedirectURIMatchingMode,
)
from authentik.providers.oauth2.tasks import send_backchannel_logout_request
@@ -47,21 +52,45 @@ class EndSessionView(PolicyAccessView):
if not self.flow:
raise Http404
def validate(self):
# Parse end session parameters
query_dict = self.request.POST if self.request.method == "POST" else self.request.GET
state = query_dict.get("state")
request_redirect_uri = query_dict.get("post_logout_redirect_uri")
id_token_hint = query_dict.get("id_token_hint")
self.post_logout_redirect_uri = None
# OIDC Certification: Verify id_token_hint. If invalid or missing, throw an error
if id_token_hint:
# Load a fresh provider instance that's not part of the flow
# since it'll have the cryptography Certificate that can't be pickled
provider = OAuth2Provider.objects.get(pk=self.provider.pk)
key, alg = provider.jwt_key
if alg != JWTAlgorithms.HS256:
key = provider.signing_key.public_key
try:
jwt_decode(
id_token_hint,
key,
algorithms=[alg],
audience=provider.client_id,
issuer=provider.get_issuer(self.request),
# ID Tokens are short-lived; a logout request arriving
# after expiry is still legitimate and must succeed.
options={"verify_exp": False},
)
except PyJWTError:
raise TokenError("invalid_request").with_cause(
"id_token_hint_decode_failed"
) from None
# Validate post_logout_redirect_uri against registered URIs
if request_redirect_uri:
# OIDC Certification: id_token_hint required with post_logout_redirect_uri
if not id_token_hint:
raise TokenError("invalid_request").with_cause("id_token_hint_missing")
if urlparse(request_redirect_uri).scheme in FORBIDDEN_URI_SCHEMES:
raise RequestValidationError(
bad_request_message(
self.request,
"Forbidden URI scheme in post_logout_redirect_uri",
)
)
raise TokenError("invalid_request").with_cause("post_logout_redirect_uri")
for allowed in self.provider.post_logout_redirect_uris:
if allowed.matching_mode == RedirectURIMatchingMode.STRICT:
if request_redirect_uri == allowed.url:
@@ -71,6 +100,10 @@ class EndSessionView(PolicyAccessView):
if fullmatch(allowed.url, request_redirect_uri):
self.post_logout_redirect_uri = request_redirect_uri
break
# OIDC Certification: OP MUST NOT perform post-logout redirection
# if the supplied URI does not exactly match a registered one
if self.post_logout_redirect_uri is None:
raise TokenError("invalid_request").with_cause("invalid_post_logout_redirect_uri")
# Append state to the redirect URI if both are present
if self.post_logout_redirect_uri and state:
@@ -91,50 +124,43 @@ class EndSessionView(PolicyAccessView):
"<html><body>Logout successful</body></html>", content_type="text/html", status=200
)
# Otherwise, continue with normal policy checks
return super().dispatch(request, *args, **kwargs)
def get(self, request: HttpRequest, *args, **kwargs) -> HttpResponse:
"""Dispatch the flow planner for the invalidation flow"""
try:
self.validate()
except TokenError as exc:
return bad_request_message(
self.request,
exc.description,
)
planner = FlowPlanner(self.flow)
planner.allow_empty_flows = True
# Build flow context with logout parameters
context = {
PLAN_CONTEXT_APPLICATION: self.application,
}
# Get session info for logout notifications and token invalidation
auth_session = AuthenticatedSession.from_request(request, request.user)
# Add validated redirect URI (with state appended) to context if available
if self.post_logout_redirect_uri:
context[PLAN_CONTEXT_POST_LOGOUT_REDIRECT_URI] = self.post_logout_redirect_uri
# Invalidate tokens for this provider/session (RP-initiated logout:
# user stays logged into authentik, only this provider's tokens are revoked)
if request.user.is_authenticated and auth_session:
AccessToken.objects.filter(
user=request.user,
provider=self.provider,
session=auth_session,
).delete()
session_key = (
auth_session.session.session_key if auth_session and auth_session.session else None
)
# Handle frontchannel logout
frontchannel_logout_url = None
if self.provider.logout_method == OAuth2LogoutMethod.FRONTCHANNEL:
frontchannel_logout_url = build_frontchannel_logout_url(
self.provider, request, session_key
)
# Handle backchannel logout
if (
self.provider.logout_method == OAuth2LogoutMethod.BACKCHANNEL
and self.provider.logout_uri
):
# Find access token to get iss and sub for the logout token
access_token = AccessToken.objects.filter(
user=request.user,
provider=self.provider,
@@ -163,9 +189,16 @@ class EndSessionView(PolicyAccessView):
}
]
access_tokens = AccessToken.objects.filter(
user=request.user,
provider=self.provider,
)
if auth_session:
access_tokens = access_tokens.filter(session=auth_session)
access_tokens.delete()
plan = planner.plan(request, context)
# Inject iframe logout stage if frontchannel logout is configured
if frontchannel_logout_url:
plan.insert_stage(in_memory_stage(IframeLogoutStageView))
+16
View File
@@ -73,8 +73,16 @@ entries:
redirect_uris:
- matching_mode: strict
url: https://localhost:8443/test/a/authentik/callback
redirect_uri_type: authorization
- matching_mode: strict
url: https://host.docker.internal:8443/test/a/authentik/callback
redirect_uri_type: authorization
- matching_mode: strict
url: https://localhost:8443/test/a/authentik/post_logout_redirect
redirect_uri_type: logout
- matching_mode: strict
url: https://host.docker.internal:8443/test/a/authentik/post_logout_redirect
redirect_uri_type: logout
grant_types:
- authorization_code
- implicit
@@ -108,8 +116,16 @@ entries:
redirect_uris:
- matching_mode: strict
url: https://localhost:8443/test/a/authentik/callback
redirect_uri_type: authorization
- matching_mode: strict
url: https://host.docker.internal:8443/test/a/authentik/callback
redirect_uri_type: authorization
- matching_mode: strict
url: https://localhost:8443/test/a/authentik/post_logout_redirect
redirect_uri_type: logout
- matching_mode: strict
url: https://host.docker.internal:8443/test/a/authentik/post_logout_redirect
redirect_uri_type: logout
grant_types:
- authorization_code
- implicit
+7
View File
@@ -7,6 +7,7 @@ from channels.testing import ChannelsLiveServerTestCase
from django.apps import apps
from django.contrib.staticfiles.testing import StaticLiveServerTestCase
from django.urls import reverse
from docker.types import Healthcheck
from dramatiq import get_broker
from structlog.stdlib import get_logger
from yaml import safe_dump
@@ -106,7 +107,13 @@ class SSLLiveMixin(DockerTestCase):
"--api=true",
"--api.dashboard=true",
"--api.insecure=true",
"--ping=true",
],
healthcheck=Healthcheck(
test=["CMD", "traefik", "healthcheck", "--ping"],
interval=5 * 1_000 * 1_000_000,
start_period=1 * 1_000 * 1_000_000,
),
ports={
"9443": None,
},
+2 -5
View File
@@ -8,11 +8,12 @@ from selenium.webdriver.support import expected_conditions as ec
from authentik.blueprints.tests import apply_blueprint, reconcile_app
from authentik.providers.oauth2.models import OAuth2Provider
from tests.live import SSLLiveMixin
from tests.openid_conformance.conformance import Conformance
from tests.selenium import SeleniumTestCase
class TestOpenIDConformance(SeleniumTestCase):
class TestOpenIDConformance(SSLLiveMixin, SeleniumTestCase):
conformance: Conformance
@@ -60,10 +61,6 @@ class TestOpenIDConformance(SeleniumTestCase):
},
"consent": {},
}
self.test_variant = {
"server_metadata": "discovery",
"client_registration": "static_client",
}
def run_test(
self, test_name: str, test_plan_config: dict[str, Any], test_variant: dict[str, Any]
@@ -0,0 +1,39 @@
from unittest.mock import patch
import urllib3
from authentik.flows.models import Flow
from authentik.lib.utils.http import get_http_session as real_get_http_session
from authentik.providers.oauth2.models import OAuth2LogoutMethod, OAuth2Provider
from tests.decorators import retry
from tests.openid_conformance.base import TestOpenIDConformance
def _insecure_http_session():
session = real_get_http_session()
session.verify = False
return session
@patch("authentik.providers.oauth2.tasks.get_http_session", _insecure_http_session)
class TestOpenIDConformanceBackchannel(TestOpenIDConformance):
def setUp(self):
super().setUp()
OAuth2Provider.objects.filter(name__startswith="oidc-conformance-").update(
invalidation_flow=Flow.objects.get(slug="default-invalidation-flow"),
logout_method=OAuth2LogoutMethod.BACKCHANNEL,
logout_uri="https://localhost:8443/test/a/authentik/backchannel_logout",
)
# We are unable to use https for this at the current time
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
@retry()
def test_oidcc_backchannel_logout_certification_test_plan(self):
self.run_test(
"oidcc-backchannel-rp-initiated-logout-certification-test-plan",
self.test_plan_config,
{
"client_registration": "static_client",
"response_type": "code",
},
)
+6 -1
View File
@@ -7,5 +7,10 @@ class TestOpenIDConformanceBasic(TestOpenIDConformance):
@retry()
def test_oidcc_basic_certification_test(self):
self.run_test(
"oidcc-basic-certification-test-plan", self.test_plan_config, self.test_variant
"oidcc-basic-certification-test-plan",
self.test_plan_config,
{
"server_metadata": "discovery",
"client_registration": "static_client",
},
)
@@ -0,0 +1,26 @@
from authentik.flows.models import Flow
from authentik.providers.oauth2.models import OAuth2LogoutMethod, OAuth2Provider
from tests.decorators import retry
from tests.openid_conformance.base import TestOpenIDConformance
class TestOpenIDConformanceFrontchannel(TestOpenIDConformance):
def setUp(self):
super().setUp()
OAuth2Provider.objects.filter(name__startswith="oidc-conformance-").update(
invalidation_flow=Flow.objects.get(slug="default-invalidation-flow"),
logout_method=OAuth2LogoutMethod.FRONTCHANNEL,
logout_uri="https://localhost:8443/test/a/authentik/frontchannel_logout",
)
@retry()
def test_oidcc_frontchannel_logout_certification_test_plan(self):
self.run_test(
"oidcc-frontchannel-rp-initiated-logout-certification-test-plan",
self.test_plan_config,
{
"client_registration": "static_client",
"response_type": "code",
},
)
@@ -7,5 +7,10 @@ class TestOpenIDConformanceImplicit(TestOpenIDConformance):
@retry()
def test_oidcc_implicit_certification_test_plan(self):
self.run_test(
"oidcc-implicit-certification-test-plan", self.test_plan_config, self.test_variant
"oidcc-implicit-certification-test-plan",
self.test_plan_config,
{
"server_metadata": "discovery",
"client_registration": "static_client",
},
)
@@ -0,0 +1,24 @@
from authentik.flows.models import Flow
from authentik.providers.oauth2.models import OAuth2Provider
from tests.decorators import retry
from tests.openid_conformance.base import TestOpenIDConformance
class TestOpenIDConformanceRPInitiated(TestOpenIDConformance):
def setUp(self):
super().setUp()
OAuth2Provider.objects.filter(name__startswith="oidc-conformance-").update(
invalidation_flow=Flow.objects.get(slug="default-invalidation-flow"),
)
@retry()
def test_oidcc_rp_initiated_certification_test_plan(self):
self.run_test(
"oidcc-rp-initiated-logout-certification-test-plan",
self.test_plan_config,
{
"client_registration": "static_client",
"response_type": "code",
},
)