diff --git a/authentik/core/migrations/0060_authenticatedsession_browser_key_and_more.py b/authentik/core/migrations/0060_authenticatedsession_browser_key_and_more.py new file mode 100644 index 0000000000..8169f914fb --- /dev/null +++ b/authentik/core/migrations/0060_authenticatedsession_browser_key_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 5.2.15 on 2026-06-12 19:45 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ("authentik_core", "0059_add_application_meta_hide"), + ] + + operations = [ + migrations.AddField( + model_name="authenticatedsession", + name="browser_key", + field=models.CharField(db_index=True, max_length=64, null=True), + ), + migrations.AddField( + model_name="authenticatedsession", + name="is_current", + field=models.BooleanField(default=True), + ), + ] diff --git a/authentik/core/models.py b/authentik/core/models.py index d549ae7471..358756d051 100644 --- a/authentik/core/models.py +++ b/authentik/core/models.py @@ -1368,6 +1368,15 @@ class AuthenticatedSession(SerializerModel): user = models.ForeignKey(User, on_delete=models.CASCADE) + # Value of the browser cookie that groups all logins created by the same browser. + # Sessions without it (pre-existing ones, or sessions created outside a browser + # context) can never be offered for account switching. + browser_key = models.CharField(max_length=64, null=True, db_index=True) + # Whether this is the login the browser most recently switched to. Stored in the + # database (not just in the cookie) so a recorded session cookie can't be replayed + # after the browser switched to another account. + is_current = models.BooleanField(default=True) + @property def serializer(self) -> type[Serializer]: from authentik.core.api.authenticated_sessions import AuthenticatedSessionSerializer @@ -1384,11 +1393,20 @@ class AuthenticatedSession(SerializerModel): @staticmethod def from_request(request: HttpRequest, user: User) -> AuthenticatedSession | None: """Create a new session from a http request""" + from authentik.root.middleware import SessionMiddleware + if not hasattr(request, "session") or not request.session.exists( request.session.session_key ): return None + browser_key = SessionMiddleware.ensure_browser_key(request) + if browser_key: + # The new login takes over the browser; all sessions it created before + # become switch targets only. Concurrent logins on the same browser may + # briefly leave two current sessions, the cookies pick the winner. + AuthenticatedSession.objects.filter(browser_key=browser_key).update(is_current=False) return AuthenticatedSession( session=Session.objects.filter(session_key=request.session.session_key).first(), user=user, + browser_key=browser_key, ) diff --git a/authentik/core/sessions.py b/authentik/core/sessions.py index 1286640278..45cea14a72 100644 --- a/authentik/core/sessions.py +++ b/authentik/core/sessions.py @@ -32,15 +32,27 @@ class SessionStore(SessionBase): def model_fields(self): return [k.value for k in self.model.Keys] + @staticmethod + def _check_superseded(session) -> None: + """Reject logins the browser has since switched away from. is_current is cleared + in the database when another login takes over the browser + (AuthenticatedSession.from_request), so a recorded session cookie can't be + replayed to resume the previous account without going through a flow.""" + authenticated_session = getattr(session, "authenticatedsession", None) + if authenticated_session is not None and not authenticated_session.is_current: + raise SuspiciousOperation("Session denied: superseded by a newer login") + def _get_session_from_db(self): try: - return self.model.objects.select_related( + session = self.model.objects.select_related( "authenticatedsession", "authenticatedsession__user", ).get( session_key=self.session_key, expires__gt=timezone.now(), ) + self._check_superseded(session) + return session except (self.model.DoesNotExist, SuspiciousOperation) as exc: if isinstance(exc, SuspiciousOperation): LOGGER.warning(str(exc)) @@ -48,13 +60,15 @@ class SessionStore(SessionBase): async def _aget_session_from_db(self): try: - return await self.model.objects.select_related( + session = await self.model.objects.select_related( "authenticatedsession", "authenticatedsession__user", ).aget( session_key=self.session_key, expires__gt=timezone.now(), ) + self._check_superseded(session) + return session except (self.model.DoesNotExist, SuspiciousOperation) as exc: if isinstance(exc, SuspiciousOperation): LOGGER.warning(str(exc)) diff --git a/authentik/core/tests/test_sessions.py b/authentik/core/tests/test_sessions.py new file mode 100644 index 0000000000..34a514af1f --- /dev/null +++ b/authentik/core/tests/test_sessions.py @@ -0,0 +1,111 @@ +"""Session browser grouping tests""" + +from django.conf import settings +from django.contrib.auth.models import AnonymousUser +from django.http import HttpResponse +from django.test import RequestFactory, TestCase +from django.urls import reverse +from django.utils.crypto import get_random_string + +from authentik.core.models import AuthenticatedSession, Session, User +from authentik.core.sessions import SessionStore +from authentik.core.tests.utils import create_test_user +from authentik.root.middleware import BROWSER_KEY_LENGTH, COOKIE_NAME_ACCOUNTS, SessionMiddleware + + +def create_session(user: User, browser_key: str | None = None) -> AuthenticatedSession: + """Create a live login for the given user""" + store = SessionStore() + store.create() + return AuthenticatedSession.objects.create( + session=Session.objects.get(session_key=store.session_key), + user=user, + browser_key=browser_key, + ) + + +class TestSessionSuperseding(TestCase): + """Test that logins a browser switched away from can't be replayed""" + + def setUp(self): + self.user = create_test_user() + + def test_current_session_loads(self): + """Test the current login of a browser keeps working""" + target = create_session(self.user, browser_key=get_random_string(BROWSER_KEY_LENGTH)) + self.client.cookies[settings.SESSION_COOKIE_NAME] = target.session.session_key + + response = self.client.get(reverse("authentik_api:user-me")) + + self.assertEqual(response.status_code, 200) + self.assertEqual(response.json()["user"]["username"], self.user.username) + + def test_superseded_session_rejected(self): + """Test a recorded session cookie can't be replayed after an account switch""" + target = create_session(self.user, browser_key=get_random_string(BROWSER_KEY_LENGTH)) + target.is_current = False + target.save(update_fields=["is_current"]) + self.client.cookies[settings.SESSION_COOKIE_NAME] = target.session.session_key + + response = self.client.get(reverse("authentik_api:user-me")) + + self.assertEqual(response.status_code, 403) + + def test_login_supersedes_other_browser_sessions(self): + """Test a new login marks the browser's previous logins as not current""" + browser_key = get_random_string(BROWSER_KEY_LENGTH) + previous = create_session(self.user, browser_key=browser_key) + other_browser = create_session(self.user, browser_key=get_random_string(BROWSER_KEY_LENGTH)) + + request = RequestFactory().get("/") + request.browser_key = browser_key + request.session = SessionStore() + request.session.create() + new_session = AuthenticatedSession.from_request(request, self.user) + + self.assertEqual(new_session.browser_key, browser_key) + self.assertTrue(new_session.is_current) + previous.refresh_from_db() + self.assertFalse(previous.is_current) + other_browser.refresh_from_db() + self.assertTrue(other_browser.is_current) + + +class TestBrowserCookie(TestCase): + """Test issuance and validation of the browser cookie""" + + def test_cookie_issued_alongside_session(self): + """Test the accounts cookie is set when a session with a browser key is saved""" + + def view(request): + request.user = AnonymousUser() + request.session["foo"] = "bar" + SessionMiddleware.ensure_browser_key(request) + return HttpResponse() + + response = SessionMiddleware(view)(RequestFactory().get("/")) + + cookie = response.cookies.get(COOKIE_NAME_ACCOUNTS) + self.assertIsNotNone(cookie) + self.assertEqual(len(cookie.value), BROWSER_KEY_LENGTH) + + def test_existing_cookie_reused(self): + """Test an existing accounts cookie is parsed onto the request""" + browser_key = get_random_string(BROWSER_KEY_LENGTH) + request = RequestFactory().get("/") + request.COOKIES[COOKIE_NAME_ACCOUNTS] = browser_key + + SessionMiddleware(lambda request: HttpResponse()).process_request(request) + + self.assertEqual(request.browser_key, browser_key) + self.assertEqual(SessionMiddleware.ensure_browser_key(request), browser_key) + + def test_parse_browser_key(self): + """Test browser cookie values are validated""" + valid = get_random_string(BROWSER_KEY_LENGTH) + self.assertEqual(SessionMiddleware.parse_browser_key(valid), valid) + self.assertIsNone(SessionMiddleware.parse_browser_key(None)) + self.assertIsNone(SessionMiddleware.parse_browser_key("")) + self.assertIsNone(SessionMiddleware.parse_browser_key("too-short")) + self.assertIsNone(SessionMiddleware.parse_browser_key("x" * (BROWSER_KEY_LENGTH + 1))) + self.assertIsNone(SessionMiddleware.parse_browser_key("!" * BROWSER_KEY_LENGTH)) diff --git a/authentik/root/middleware.py b/authentik/root/middleware.py index e3b99426ab..9202fada41 100644 --- a/authentik/root/middleware.py +++ b/authentik/root/middleware.py @@ -1,6 +1,7 @@ """Dynamically set SameSite depending if the upstream connection is TLS or not""" from collections.abc import Callable +from datetime import timedelta from hashlib import sha512 from ipaddress import ip_address from time import perf_counter, time @@ -16,6 +17,7 @@ from django.http.response import HttpResponse, HttpResponseServerError from django.middleware.csrf import CSRF_SESSION_KEY from django.middleware.csrf import CsrfViewMiddleware as UpstreamCsrfViewMiddleware from django.utils.cache import patch_vary_headers +from django.utils.crypto import get_random_string from django.utils.http import http_date from jwt import PyJWTError, decode, encode from sentry_sdk import Scope @@ -28,6 +30,13 @@ LOGGER = get_logger("authentik.asgi") ACR_AUTHENTIK_SESSION = "goauthentik.io/core/default" SIGNING_HASH = sha512(settings.SECRET_KEY.encode()).hexdigest() +# Opaque identifier for the browser itself, which groups all logins created by it +# (AuthenticatedSession.browser_key) so they can be offered for account switching. +# Unlike the session cookie it survives logins and logouts. +COOKIE_NAME_ACCOUNTS = "authentik_accounts" +BROWSER_KEY_LENGTH = 32 +BROWSER_COOKIE_AGE = int(timedelta(days=365).total_seconds()) + class SessionMiddleware(UpstreamSessionMiddleware): """Dynamically set SameSite depending if the upstream connection is TLS or not""" @@ -78,9 +87,30 @@ class SessionMiddleware(UpstreamSessionMiddleware): value = session_key return value + @staticmethod + def parse_browser_key(raw: str | None) -> str | None: + """Validate the browser cookie value; it is opaque and only ever compared for equality""" + if raw and len(raw) == BROWSER_KEY_LENGTH and raw.isalnum(): + return raw + return None + + @staticmethod + def ensure_browser_key(request: HttpRequest) -> str | None: + """Return the request's browser key, generating one if the browser doesn't have one + yet. Returns None for requests that didn't pass through the session middleware + (e.g. test client logins), leaving the session unbound to any browser.""" + if not hasattr(request, "browser_key"): + return None + if not request.browser_key: + request.browser_key = get_random_string(BROWSER_KEY_LENGTH) + return request.browser_key + def process_request(self, request: HttpRequest): raw_session = request.COOKIES.get(settings.SESSION_COOKIE_NAME) session_key = SessionMiddleware.decode_session_key(raw_session) + request.browser_key = SessionMiddleware.parse_browser_key( + request.COOKIES.get(COOKIE_NAME_ACCOUNTS) + ) request.session = self.SessionStore( session_key, last_ip=ClientIPMiddleware.get_client_ip(request), @@ -145,6 +175,21 @@ class SessionMiddleware(UpstreamSessionMiddleware): httponly=settings.SESSION_COOKIE_HTTPONLY or None, samesite=same_site, ) + # Issue/refresh the browser cookie alongside the session cookie so it + # always outlives the sessions bound to it. It is intentionally not + # deleted with the session cookie, as it groups the other logins + # this browser still has. + if getattr(request, "browser_key", None): + response.set_cookie( + COOKIE_NAME_ACCOUNTS, + request.browser_key, + max_age=BROWSER_COOKIE_AGE, + domain=settings.SESSION_COOKIE_DOMAIN, + path=settings.SESSION_COOKIE_PATH, + secure=secure, + httponly=True, + samesite=same_site, + ) return response