diff --git a/authentik/sources/saml/processors/response.py b/authentik/sources/saml/processors/response.py index 5180763ce1..e6a514771a 100644 --- a/authentik/sources/saml/processors/response.py +++ b/authentik/sources/saml/processors/response.py @@ -1,6 +1,7 @@ """authentik saml source processor""" from base64 import b64decode +from datetime import UTC, datetime from time import mktime from typing import TYPE_CHECKING @@ -40,6 +41,7 @@ from authentik.sources.saml.exceptions import ( InvalidSignature, MismatchedRequestID, MissingSAMLResponse, + SAMLException, UnsupportedNameIDFormat, ) from authentik.sources.saml.models import ( @@ -95,6 +97,7 @@ class ResponseProcessor: self._verify_request_id() self._verify_status() + self._verify_conditions() def _decrypt_response(self): """Decrypt SAMLResponse EncryptedAssertion Element""" @@ -126,6 +129,20 @@ class ResponseProcessor: ) self._assertion = decrypted_assertion + def _verify_conditions(self): + conditions = self.get_assertion().find(f"{{{NS_SAML_ASSERTION}}}Conditions") + if conditions is None: + return + _now = now() + before = conditions.attrib.get("NotBefore") + if before: + if datetime.fromisoformat(before).replace(tzinfo=UTC) > _now: + raise SAMLException("Assertion is not valid yet or expired.") + on_or_after = conditions.attrib.get("NotOnOrAfter") + if on_or_after: + if datetime.fromisoformat(on_or_after).replace(tzinfo=UTC) < _now: + raise SAMLException("Assertion is not valid yet or expired.") + def _verify_signature(self, signature_node: _Element): """Verify a single signature node""" xmlsec.tree.add_ids(self._root, ["ID"]) diff --git a/authentik/sources/saml/tests/test_property_mappings.py b/authentik/sources/saml/tests/test_property_mappings.py index d7a51b6b39..dee422e746 100644 --- a/authentik/sources/saml/tests/test_property_mappings.py +++ b/authentik/sources/saml/tests/test_property_mappings.py @@ -4,6 +4,7 @@ from base64 import b64encode from defusedxml.lxml import fromstring from django.test import TestCase +from freezegun import freeze_time from authentik.common.saml.constants import NS_SAML_ASSERTION from authentik.core.tests.utils import RequestFactory, create_test_flow @@ -34,6 +35,7 @@ class TestPropertyMappings(TestCase): pre_authentication_flow=create_test_flow(), ) + @freeze_time("2022-10-14T14:15:00") def test_user_base_properties(self): """Test user base properties""" properties = self.source.get_base_user_properties( @@ -61,6 +63,7 @@ class TestPropertyMappings(TestCase): properties = self.source.get_base_group_properties(root=ROOT, group_id=group_id) self.assertEqual(properties, {"name": group_id}) + @freeze_time("2022-10-14T14:15:00") def test_user_property_mappings(self): """Test user property mappings""" self.source.user_property_mappings.add( @@ -94,6 +97,7 @@ class TestPropertyMappings(TestCase): }, ) + @freeze_time("2022-10-14T14:15:00") def test_group_property_mappings(self): """Test group property mappings""" self.source.group_property_mappings.add( diff --git a/authentik/sources/saml/tests/test_response.py b/authentik/sources/saml/tests/test_response.py index f448e351d9..c8092f9c1e 100644 --- a/authentik/sources/saml/tests/test_response.py +++ b/authentik/sources/saml/tests/test_response.py @@ -3,6 +3,7 @@ from base64 import b64encode from django.test import TestCase +from freezegun import freeze_time from authentik.core.tests.utils import RequestFactory, create_test_cert, create_test_flow from authentik.crypto.models import CertificateKeyPair @@ -46,6 +47,7 @@ class TestResponseProcessor(TestCase): ): ResponseProcessor(self.source, request).parse() + @freeze_time("2022-10-14T14:15:00") def test_success(self): """Test success""" request = self.factory.post( @@ -72,6 +74,7 @@ class TestResponseProcessor(TestCase): }, ) + @freeze_time("2022-10-14T14:16:40Z") def test_success_with_status_message_and_detail(self): """Test success with StatusMessage and StatusDetail present (should not raise error)""" request = self.factory.post( @@ -88,6 +91,7 @@ class TestResponseProcessor(TestCase): sfm = parser.prepare_flow_manager() self.assertEqual(sfm.user_properties["username"], "jens@goauthentik.io") + @freeze_time("2022-10-14T14:16:40Z") def test_error_with_message_and_detail(self): """Test error status with StatusMessage and StatusDetail includes both in error""" request = self.factory.post( @@ -105,6 +109,7 @@ class TestResponseProcessor(TestCase): self.assertIn("User account is disabled", str(ctx.exception)) self.assertIn("Authentication failed", str(ctx.exception)) + @freeze_time("2024-08-07T15:48:09.325Z") def test_encrypted_correct(self): """Test encrypted""" key = load_fixture("fixtures/encrypted-key.pem") @@ -142,6 +147,7 @@ class TestResponseProcessor(TestCase): with self.assertRaises(InvalidEncryption): parser.parse() + @freeze_time("2022-10-14T14:16:40Z") def test_verification_assertion(self): """Test verifying signature inside assertion""" key = load_fixture("fixtures/signature_cert.pem") @@ -164,6 +170,7 @@ class TestResponseProcessor(TestCase): parser = ResponseProcessor(self.source, request) parser.parse() + @freeze_time("2014-07-17T01:02:18Z") def test_verification_assertion_duplicate(self): """Test verifying signature inside assertion, where the response has another assertion before our signed assertion""" @@ -189,6 +196,7 @@ class TestResponseProcessor(TestCase): self.assertNotEqual(parser._get_name_id().text, "bad") self.assertEqual(parser._get_name_id().text, "_ce3d2948b4cf20146dee0a0b3dd6f69b6cf86f62d7") + @freeze_time("2014-07-17T01:02:18Z") def test_verification_response(self): """Test verifying signature inside response""" key = load_fixture("fixtures/signature_cert.pem") @@ -211,6 +219,7 @@ class TestResponseProcessor(TestCase): parser = ResponseProcessor(self.source, request) parser.parse() + @freeze_time("2024-01-18T06:20:48Z") def test_verification_response_and_assertion(self): """Test verifying signature inside response and assertion""" key = load_fixture("fixtures/signature_cert.pem") @@ -257,6 +266,7 @@ class TestResponseProcessor(TestCase): with self.assertRaisesMessage(InvalidSignature, ""): parser.parse() + @freeze_time("2022-10-14T14:15:00") def test_verification_no_signature(self): """Test rejecting response without signature when signed_assertion is True""" key = load_fixture("fixtures/signature_cert.pem") @@ -303,6 +313,7 @@ class TestResponseProcessor(TestCase): with self.assertRaisesMessage(InvalidSignature, ""): parser.parse() + @freeze_time("2025-10-30T05:45:47.619Z") def test_signed_encrypted_response(self): """Test signed & encrypted response""" verification_key = load_fixture("fixtures/signature_cert2.pem") @@ -330,6 +341,7 @@ class TestResponseProcessor(TestCase): parser = ResponseProcessor(self.source, request) parser.parse() + @freeze_time("2026-01-21T14:23") def test_transient(self): """Test SAML transient NameID""" verification_key = load_fixture("fixtures/signature_cert2.pem") diff --git a/authentik/sources/saml/tests/test_views.py b/authentik/sources/saml/tests/test_views.py index 542e6d10d1..291cfea8a6 100644 --- a/authentik/sources/saml/tests/test_views.py +++ b/authentik/sources/saml/tests/test_views.py @@ -4,6 +4,7 @@ from base64 import b64encode from django.test import RequestFactory, TestCase from django.urls import reverse +from freezegun import freeze_time from authentik.core.tests.utils import create_test_flow from authentik.flows.planner import PLAN_CONTEXT_REDIRECT, FlowPlan @@ -26,6 +27,7 @@ class TestViews(TestCase): pre_authentication_flow=create_test_flow(), ) + @freeze_time("2022-10-14T14:15:00") def test_enroll(self): """Enroll""" flow = create_test_flow() @@ -52,6 +54,7 @@ class TestViews(TestCase): plan: FlowPlan = self.client.session.get(SESSION_KEY_PLAN) self.assertIsNotNone(plan) + @freeze_time("2022-10-14T14:15:00") def test_enroll_redirect(self): """Enroll when attempting to access a provider""" initial_redirect = f"http://{generate_id()}"