core: group browser logins and reject superseded sessions

Logins now stamp an opaque browser key from the new authentik_accounts
cookie onto their AuthenticatedSession. A new login marks the browser's
previous sessions as not current in the database, and the session store
refuses to load them, so a recorded session cookie can't be replayed to
resume the previous account after a switch.

A7k-product: product
A7k-product-repo: 1
Co-authored-by: Agent <claudeagent@svc.sdko.net>
This commit is contained in:
Dominic R
2026-06-12 15:48:47 -04:00
parent 8c44b67bba
commit b005f8a4f9
5 changed files with 213 additions and 2 deletions
@@ -0,0 +1,23 @@
# Generated by Django 5.2.15 on 2026-06-12 19:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_core", "0059_add_application_meta_hide"),
]
operations = [
migrations.AddField(
model_name="authenticatedsession",
name="browser_key",
field=models.CharField(db_index=True, max_length=64, null=True),
),
migrations.AddField(
model_name="authenticatedsession",
name="is_current",
field=models.BooleanField(default=True),
),
]
+18
View File
@@ -1368,6 +1368,15 @@ class AuthenticatedSession(SerializerModel):
user = models.ForeignKey(User, on_delete=models.CASCADE)
# Value of the browser cookie that groups all logins created by the same browser.
# Sessions without it (pre-existing ones, or sessions created outside a browser
# context) can never be offered for account switching.
browser_key = models.CharField(max_length=64, null=True, db_index=True)
# Whether this is the login the browser most recently switched to. Stored in the
# database (not just in the cookie) so a recorded session cookie can't be replayed
# after the browser switched to another account.
is_current = models.BooleanField(default=True)
@property
def serializer(self) -> type[Serializer]:
from authentik.core.api.authenticated_sessions import AuthenticatedSessionSerializer
@@ -1384,11 +1393,20 @@ class AuthenticatedSession(SerializerModel):
@staticmethod
def from_request(request: HttpRequest, user: User) -> AuthenticatedSession | None:
"""Create a new session from a http request"""
from authentik.root.middleware import SessionMiddleware
if not hasattr(request, "session") or not request.session.exists(
request.session.session_key
):
return None
browser_key = SessionMiddleware.ensure_browser_key(request)
if browser_key:
# The new login takes over the browser; all sessions it created before
# become switch targets only. Concurrent logins on the same browser may
# briefly leave two current sessions, the cookies pick the winner.
AuthenticatedSession.objects.filter(browser_key=browser_key).update(is_current=False)
return AuthenticatedSession(
session=Session.objects.filter(session_key=request.session.session_key).first(),
user=user,
browser_key=browser_key,
)
+16 -2
View File
@@ -32,15 +32,27 @@ class SessionStore(SessionBase):
def model_fields(self):
return [k.value for k in self.model.Keys]
@staticmethod
def _check_superseded(session) -> None:
"""Reject logins the browser has since switched away from. is_current is cleared
in the database when another login takes over the browser
(AuthenticatedSession.from_request), so a recorded session cookie can't be
replayed to resume the previous account without going through a flow."""
authenticated_session = getattr(session, "authenticatedsession", None)
if authenticated_session is not None and not authenticated_session.is_current:
raise SuspiciousOperation("Session denied: superseded by a newer login")
def _get_session_from_db(self):
try:
return self.model.objects.select_related(
session = self.model.objects.select_related(
"authenticatedsession",
"authenticatedsession__user",
).get(
session_key=self.session_key,
expires__gt=timezone.now(),
)
self._check_superseded(session)
return session
except (self.model.DoesNotExist, SuspiciousOperation) as exc:
if isinstance(exc, SuspiciousOperation):
LOGGER.warning(str(exc))
@@ -48,13 +60,15 @@ class SessionStore(SessionBase):
async def _aget_session_from_db(self):
try:
return await self.model.objects.select_related(
session = await self.model.objects.select_related(
"authenticatedsession",
"authenticatedsession__user",
).aget(
session_key=self.session_key,
expires__gt=timezone.now(),
)
self._check_superseded(session)
return session
except (self.model.DoesNotExist, SuspiciousOperation) as exc:
if isinstance(exc, SuspiciousOperation):
LOGGER.warning(str(exc))
+111
View File
@@ -0,0 +1,111 @@
"""Session browser grouping tests"""
from django.conf import settings
from django.contrib.auth.models import AnonymousUser
from django.http import HttpResponse
from django.test import RequestFactory, TestCase
from django.urls import reverse
from django.utils.crypto import get_random_string
from authentik.core.models import AuthenticatedSession, Session, User
from authentik.core.sessions import SessionStore
from authentik.core.tests.utils import create_test_user
from authentik.root.middleware import BROWSER_KEY_LENGTH, COOKIE_NAME_ACCOUNTS, SessionMiddleware
def create_session(user: User, browser_key: str | None = None) -> AuthenticatedSession:
"""Create a live login for the given user"""
store = SessionStore()
store.create()
return AuthenticatedSession.objects.create(
session=Session.objects.get(session_key=store.session_key),
user=user,
browser_key=browser_key,
)
class TestSessionSuperseding(TestCase):
"""Test that logins a browser switched away from can't be replayed"""
def setUp(self):
self.user = create_test_user()
def test_current_session_loads(self):
"""Test the current login of a browser keeps working"""
target = create_session(self.user, browser_key=get_random_string(BROWSER_KEY_LENGTH))
self.client.cookies[settings.SESSION_COOKIE_NAME] = target.session.session_key
response = self.client.get(reverse("authentik_api:user-me"))
self.assertEqual(response.status_code, 200)
self.assertEqual(response.json()["user"]["username"], self.user.username)
def test_superseded_session_rejected(self):
"""Test a recorded session cookie can't be replayed after an account switch"""
target = create_session(self.user, browser_key=get_random_string(BROWSER_KEY_LENGTH))
target.is_current = False
target.save(update_fields=["is_current"])
self.client.cookies[settings.SESSION_COOKIE_NAME] = target.session.session_key
response = self.client.get(reverse("authentik_api:user-me"))
self.assertEqual(response.status_code, 403)
def test_login_supersedes_other_browser_sessions(self):
"""Test a new login marks the browser's previous logins as not current"""
browser_key = get_random_string(BROWSER_KEY_LENGTH)
previous = create_session(self.user, browser_key=browser_key)
other_browser = create_session(self.user, browser_key=get_random_string(BROWSER_KEY_LENGTH))
request = RequestFactory().get("/")
request.browser_key = browser_key
request.session = SessionStore()
request.session.create()
new_session = AuthenticatedSession.from_request(request, self.user)
self.assertEqual(new_session.browser_key, browser_key)
self.assertTrue(new_session.is_current)
previous.refresh_from_db()
self.assertFalse(previous.is_current)
other_browser.refresh_from_db()
self.assertTrue(other_browser.is_current)
class TestBrowserCookie(TestCase):
"""Test issuance and validation of the browser cookie"""
def test_cookie_issued_alongside_session(self):
"""Test the accounts cookie is set when a session with a browser key is saved"""
def view(request):
request.user = AnonymousUser()
request.session["foo"] = "bar"
SessionMiddleware.ensure_browser_key(request)
return HttpResponse()
response = SessionMiddleware(view)(RequestFactory().get("/"))
cookie = response.cookies.get(COOKIE_NAME_ACCOUNTS)
self.assertIsNotNone(cookie)
self.assertEqual(len(cookie.value), BROWSER_KEY_LENGTH)
def test_existing_cookie_reused(self):
"""Test an existing accounts cookie is parsed onto the request"""
browser_key = get_random_string(BROWSER_KEY_LENGTH)
request = RequestFactory().get("/")
request.COOKIES[COOKIE_NAME_ACCOUNTS] = browser_key
SessionMiddleware(lambda request: HttpResponse()).process_request(request)
self.assertEqual(request.browser_key, browser_key)
self.assertEqual(SessionMiddleware.ensure_browser_key(request), browser_key)
def test_parse_browser_key(self):
"""Test browser cookie values are validated"""
valid = get_random_string(BROWSER_KEY_LENGTH)
self.assertEqual(SessionMiddleware.parse_browser_key(valid), valid)
self.assertIsNone(SessionMiddleware.parse_browser_key(None))
self.assertIsNone(SessionMiddleware.parse_browser_key(""))
self.assertIsNone(SessionMiddleware.parse_browser_key("too-short"))
self.assertIsNone(SessionMiddleware.parse_browser_key("x" * (BROWSER_KEY_LENGTH + 1)))
self.assertIsNone(SessionMiddleware.parse_browser_key("!" * BROWSER_KEY_LENGTH))
+45
View File
@@ -1,6 +1,7 @@
"""Dynamically set SameSite depending if the upstream connection is TLS or not"""
from collections.abc import Callable
from datetime import timedelta
from hashlib import sha512
from ipaddress import ip_address
from time import perf_counter, time
@@ -16,6 +17,7 @@ from django.http.response import HttpResponse, HttpResponseServerError
from django.middleware.csrf import CSRF_SESSION_KEY
from django.middleware.csrf import CsrfViewMiddleware as UpstreamCsrfViewMiddleware
from django.utils.cache import patch_vary_headers
from django.utils.crypto import get_random_string
from django.utils.http import http_date
from jwt import PyJWTError, decode, encode
from sentry_sdk import Scope
@@ -28,6 +30,13 @@ LOGGER = get_logger("authentik.asgi")
ACR_AUTHENTIK_SESSION = "goauthentik.io/core/default"
SIGNING_HASH = sha512(settings.SECRET_KEY.encode()).hexdigest()
# Opaque identifier for the browser itself, which groups all logins created by it
# (AuthenticatedSession.browser_key) so they can be offered for account switching.
# Unlike the session cookie it survives logins and logouts.
COOKIE_NAME_ACCOUNTS = "authentik_accounts"
BROWSER_KEY_LENGTH = 32
BROWSER_COOKIE_AGE = int(timedelta(days=365).total_seconds())
class SessionMiddleware(UpstreamSessionMiddleware):
"""Dynamically set SameSite depending if the upstream connection is TLS or not"""
@@ -78,9 +87,30 @@ class SessionMiddleware(UpstreamSessionMiddleware):
value = session_key
return value
@staticmethod
def parse_browser_key(raw: str | None) -> str | None:
"""Validate the browser cookie value; it is opaque and only ever compared for equality"""
if raw and len(raw) == BROWSER_KEY_LENGTH and raw.isalnum():
return raw
return None
@staticmethod
def ensure_browser_key(request: HttpRequest) -> str | None:
"""Return the request's browser key, generating one if the browser doesn't have one
yet. Returns None for requests that didn't pass through the session middleware
(e.g. test client logins), leaving the session unbound to any browser."""
if not hasattr(request, "browser_key"):
return None
if not request.browser_key:
request.browser_key = get_random_string(BROWSER_KEY_LENGTH)
return request.browser_key
def process_request(self, request: HttpRequest):
raw_session = request.COOKIES.get(settings.SESSION_COOKIE_NAME)
session_key = SessionMiddleware.decode_session_key(raw_session)
request.browser_key = SessionMiddleware.parse_browser_key(
request.COOKIES.get(COOKIE_NAME_ACCOUNTS)
)
request.session = self.SessionStore(
session_key,
last_ip=ClientIPMiddleware.get_client_ip(request),
@@ -145,6 +175,21 @@ class SessionMiddleware(UpstreamSessionMiddleware):
httponly=settings.SESSION_COOKIE_HTTPONLY or None,
samesite=same_site,
)
# Issue/refresh the browser cookie alongside the session cookie so it
# always outlives the sessions bound to it. It is intentionally not
# deleted with the session cookie, as it groups the other logins
# this browser still has.
if getattr(request, "browser_key", None):
response.set_cookie(
COOKIE_NAME_ACCOUNTS,
request.browser_key,
max_age=BROWSER_COOKIE_AGE,
domain=settings.SESSION_COOKIE_DOMAIN,
path=settings.SESSION_COOKIE_PATH,
secure=secure,
httponly=True,
samesite=same_site,
)
return response