stage/authenticator*: expand attempt throttling to email- and sms-based 2FA (#21751)

* stages/authenticator*: enable attempt throttling for email- and sms-based second authentication factor

* stages/authenticator*: add throttling tests

* stage/authenticator_validate: add throttling documentation

* Update website/docs/add-secure-apps/flows-stages/stages/authenticator_validate/index.mdx

Co-authored-by: Dominic R <dominic@sdko.org>
Signed-off-by: Alexander Tereshkin <96586+atereshkin@users.noreply.github.com>

* Update website/docs/add-secure-apps/flows-stages/stages/authenticator_validate/index.mdx

Co-authored-by: Dominic R <dominic@sdko.org>
Signed-off-by: Alexander Tereshkin <96586+atereshkin@users.noreply.github.com>

* stages/authenticator_validate: update docs wording

* Update website/docs/add-secure-apps/flows-stages/stages/authenticator_validate/index.mdx

Co-authored-by: Dominic R <dominic@sdko.org>
Signed-off-by: Alexander Tereshkin <96586+atereshkin@users.noreply.github.com>

* Update website/docs/add-secure-apps/flows-stages/stages/authenticator_validate/index.mdx

Co-authored-by: Dominic R <dominic@sdko.org>
Signed-off-by: Alexander Tereshkin <96586+atereshkin@users.noreply.github.com>

* Update website/docs/add-secure-apps/flows-stages/stages/authenticator_validate/index.mdx

Co-authored-by: Dominic R <dominic@sdko.org>
Signed-off-by: Alexander Tereshkin <96586+atereshkin@users.noreply.github.com>

---------

Signed-off-by: Alexander Tereshkin <96586+atereshkin@users.noreply.github.com>
Co-authored-by: Dominic R <dominic@sdko.org>
This commit is contained in:
Alexander Tereshkin
2026-05-07 20:12:06 +03:00
committed by GitHub
parent f1d3664c96
commit 93abd2e041
24 changed files with 782 additions and 37 deletions
+9 -7
View File
@@ -389,17 +389,19 @@ class ThrottlingMixin(models.Model):
"""Check if throttling is enabled"""
return self.get_throttle_factor() > 0
def get_throttle_factor(self): # pragma: no cover
def get_throttle_factor(self) -> float: # pragma: no cover
"""
This must be implemented to return the throttle factor.
Returns the throttling factor.
"""
return getattr(self, "_throttle_factor", 1.0)
def set_throttle_factor(self, throttle_factor: float) -> None:
"""
Sets the throttle factor to use. Call this to override the default value of 1.
The number of seconds required between verification attempts will be
:math:`c2^{n-1}` where `c` is this factor and `n` is the number of
previous failures. A factor of 1 translates to delays of 1, 2, 4, 8,
etc. seconds. A factor of 0 disables the throttling.
Normally this is just a wrapper for a plugin-specific setting like
:setting:`OTP_EMAIL_THROTTLE_FACTOR`.
"""
raise NotImplementedError()
self._throttle_factor = throttle_factor
+22 -3
View File
@@ -6,7 +6,6 @@ from threading import Thread
from django.contrib.auth.models import AnonymousUser
from django.db import connection
from django.test import TestCase, TransactionTestCase
from django.test.utils import override_settings
from django.utils import timezone
from freezegun import freeze_time
@@ -110,8 +109,24 @@ class ThrottlingTestMixin:
self.assertEqual(verify_is_allowed3, True)
self.assertEqual(data3, None)
def test_set_throttle_factor_is_reflected(self):
"""`set_throttle_factor` must drive `get_throttle_factor`."""
self.device.set_throttle_factor(5.5)
self.assertEqual(self.device.get_throttle_factor(), 5.5)
self.device.set_throttle_factor(0)
self.assertEqual(self.device.get_throttle_factor(), 0)
def test_throttling_disabled_by_factor_zero(self):
"""Setting the throttle factor to 0 must actually disable throttling.
A failed attempt followed by a successful one must succeed. The lockout
path must not kick in when the factor is 0.
"""
self.device.set_throttle_factor(0)
self.assertFalse(self.device.verify_token(self.invalid_token()))
self.assertTrue(self.device.verify_token(self.valid_token()))
@override_settings(OTP_STATIC_THROTTLE_FACTOR=0)
class APITestCase(TestCase):
"""Test API"""
@@ -119,6 +134,7 @@ class APITestCase(TestCase):
self.alice = create_test_admin_user("alice")
self.bob = create_test_admin_user("bob")
device = self.alice.staticdevice_set.create()
device.set_throttle_factor(0)
self.valid = generate_id(length=16)
device.token_set.create(token=self.valid)
@@ -138,6 +154,8 @@ class APITestCase(TestCase):
verified = verify_token(self.alice, device.persistent_id, "bogus")
self.assertIsNone(verified)
self.alice.staticdevice_set.get().throttle_reset()
verified = verify_token(self.alice, device.persistent_id, self.valid)
self.assertIsNotNone(verified)
@@ -146,11 +164,12 @@ class APITestCase(TestCase):
verified = match_token(self.alice, "bogus")
self.assertIsNone(verified)
self.alice.staticdevice_set.get().throttle_reset()
verified = match_token(self.alice, self.valid)
self.assertEqual(verified, self.alice.staticdevice_set.first())
@override_settings(OTP_STATIC_THROTTLE_FACTOR=0)
class ConcurrencyTestCase(TransactionTestCase):
"""Test concurrent verifications"""
@@ -0,0 +1,33 @@
# Generated by Django 5.2.12 on 2026-04-02 15:14
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
(
"authentik_stages_authenticator_email",
"0002_alter_authenticatoremailstage_friendly_name",
),
]
operations = [
migrations.AddField(
model_name="emaildevice",
name="throttling_failure_count",
field=models.PositiveIntegerField(
default=0, help_text="Number of successive failed attempts."
),
),
migrations.AddField(
model_name="emaildevice",
name="throttling_failure_timestamp",
field=models.DateTimeField(
blank=True,
default=None,
help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded.",
null=True,
),
),
]
+16 -2
View File
@@ -14,7 +14,7 @@ from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.config import CONFIG
from authentik.lib.models import SerializerModel
from authentik.lib.utils.time import timedelta_string_validator
from authentik.stages.authenticator.models import SideChannelDevice
from authentik.stages.authenticator.models import SideChannelDevice, ThrottlingMixin
from authentik.stages.email.models import EmailTemplates
from authentik.stages.email.utils import TemplateEmailMessage
@@ -116,7 +116,7 @@ class AuthenticatorEmailStage(ConfigurableStage, FriendlyNamedStage, Stage):
verbose_name_plural = _("Email Authenticator Setup Stages")
class EmailDevice(SerializerModel, SideChannelDevice):
class EmailDevice(SerializerModel, ThrottlingMixin, SideChannelDevice):
"""Email Device"""
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
@@ -130,6 +130,20 @@ class EmailDevice(SerializerModel, SideChannelDevice):
return EmailDeviceSerializer
def verify_token(self, token: str) -> bool:
verify_allowed, _ = self.verify_is_allowed()
if verify_allowed:
verified = super().verify_token(token)
if verified:
self.throttle_reset()
else:
self.throttle_increment()
else:
verified = False
return verified
def _compose_email(self) -> TemplateEmailMessage:
try:
pending_user = self.user
@@ -8,6 +8,7 @@ from django.core.mail.backends.locmem import EmailBackend
from django.core.mail.backends.smtp import EmailBackend as SMTPEmailBackend
from django.db.utils import IntegrityError
from django.template.exceptions import TemplateDoesNotExist
from django.test import TestCase
from django.urls import reverse
from django.utils.timezone import now
@@ -16,6 +17,7 @@ from authentik.flows.models import FlowStageBinding
from authentik.flows.tests import FlowTestCase
from authentik.lib.config import CONFIG
from authentik.lib.utils.email import mask_email
from authentik.stages.authenticator.tests import ThrottlingTestMixin
from authentik.stages.authenticator_email.api import (
AuthenticatorEmailStageSerializer,
EmailDeviceSerializer,
@@ -79,6 +81,7 @@ class TestAuthenticatorEmailStage(FlowTestCase):
self.assertFalse(self.device.verify_token("000000"))
# Verify correct token (should clear token after verification)
self.device.throttle_reset(commit=False)
self.assertTrue(self.device.verify_token(token))
self.assertIsNone(self.device.token)
@@ -329,3 +332,27 @@ class TestAuthenticatorEmailStage(FlowTestCase):
# Test AuthenticatorEmailStage send method
self.stage.send(self.device)
self.assertEqual(len(mail.outbox), 1)
class TestEmailDeviceThrottling(ThrottlingTestMixin, TestCase):
def setUp(self):
super().setUp()
flow = create_test_flow()
user = create_test_user()
stage = AuthenticatorEmailStage.objects.create(
name="email-authenticator-throttle",
use_global_settings=True,
from_address="test@authentik.local",
configure_flow=flow,
token_expiry="minutes=30",
) # nosec
self.device = EmailDevice.objects.create(
user=user, stage=stage, email="throttle@authentik.local"
)
self.device.generate_token()
def valid_token(self):
return self.device.token
def invalid_token(self):
return "000000" if self.device.token != "000000" else "111111"
@@ -0,0 +1,30 @@
# Generated by Django 5.2.12 on 2026-04-16 17:28
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_stages_authenticator_sms", "0008_alter_authenticatorsmsstage_friendly_name"),
]
operations = [
migrations.AddField(
model_name="smsdevice",
name="throttling_failure_count",
field=models.PositiveIntegerField(
default=0, help_text="Number of successive failed attempts."
),
),
migrations.AddField(
model_name="smsdevice",
name="throttling_failure_timestamp",
field=models.DateTimeField(
blank=True,
default=None,
help_text="A timestamp of the last failed verification attempt. Null if last attempt succeeded.",
null=True,
),
),
]
+15 -7
View File
@@ -20,7 +20,7 @@ from authentik.events.utils import sanitize_item
from authentik.flows.models import ConfigurableStage, FriendlyNamedStage, Stage
from authentik.lib.models import SerializerModel
from authentik.lib.utils.http import get_http_session
from authentik.stages.authenticator.models import SideChannelDevice
from authentik.stages.authenticator.models import SideChannelDevice, ThrottlingMixin
LOGGER = get_logger()
@@ -197,7 +197,7 @@ def hash_phone_number(phone_number: str) -> str:
return "hash:" + sha256(phone_number.encode()).hexdigest()
class SMSDevice(SerializerModel, SideChannelDevice):
class SMSDevice(SerializerModel, ThrottlingMixin, SideChannelDevice):
"""SMS Device"""
user = models.ForeignKey(get_user_model(), on_delete=models.CASCADE)
@@ -224,11 +224,19 @@ class SMSDevice(SerializerModel, SideChannelDevice):
return SMSDeviceSerializer
def verify_token(self, token):
valid = super().verify_token(token)
if valid:
self.save()
return valid
def verify_token(self, token: str) -> bool:
verify_allowed, _ = self.verify_is_allowed()
if verify_allowed:
verified = super().verify_token(token)
if verified:
self.throttle_reset()
else:
self.throttle_increment()
else:
verified = False
return verified
def __str__(self):
return str(self.name) or str(self.user_id)
@@ -3,6 +3,7 @@
from unittest.mock import MagicMock, patch
from urllib.parse import parse_qsl
from django.test import TestCase
from django.urls import reverse
from requests_mock import Mocker
@@ -12,6 +13,7 @@ from authentik.flows.planner import FlowPlan
from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import SESSION_KEY_PLAN
from authentik.lib.generators import generate_id
from authentik.stages.authenticator.tests import ThrottlingTestMixin
from authentik.stages.authenticator_sms.models import (
AuthenticatorSMSStage,
SMSDevice,
@@ -357,3 +359,30 @@ class AuthenticatorSMSStageTests(FlowTestCase):
},
phone_number_required=False,
)
class TestSMSDeviceThrottling(ThrottlingTestMixin, TestCase):
"""Test ThrottlingMixin behaviour on SMSDevice.verify_token"""
def setUp(self):
super().setUp()
flow = create_test_flow()
user = create_test_admin_user()
stage = AuthenticatorSMSStage.objects.create(
flow=flow,
name="sms-throttle",
provider=SMSProviders.GENERIC,
from_number="1234",
)
self.device = SMSDevice.objects.create(
user=user,
stage=stage,
phone_number="+15551230001",
)
self.device.generate_token()
def valid_token(self):
return self.device.token
def invalid_token(self):
return "000000" if self.device.token != "000000" else "111111"
@@ -3,7 +3,6 @@
from base64 import b32encode
from os import urandom
from django.conf import settings
from django.core.validators import MaxValueValidator
from django.db import models
from django.utils.translation import gettext_lazy as _
@@ -78,9 +77,6 @@ class StaticDevice(SerializerModel, ThrottlingMixin, Device):
return StaticDeviceSerializer
def get_throttle_factor(self):
return getattr(settings, "OTP_STATIC_THROTTLE_FACTOR", 1)
def verify_token(self, token):
verify_allowed, _ = self.verify_is_allowed()
if verify_allowed:
@@ -1,6 +1,5 @@
"""Test Static API"""
from django.test.utils import override_settings
from django.urls import reverse
from rest_framework.test import APITestCase
@@ -44,9 +43,6 @@ class DeviceTest(TestCase):
str(device)
@override_settings(
OTP_STATIC_THROTTLE_FACTOR=1,
)
class ThrottlingTestCase(ThrottlingTestMixin, TestCase):
"""Test static device throttling"""
@@ -194,9 +194,6 @@ class TOTPDevice(SerializerModel, ThrottlingMixin, Device):
return verified
def get_throttle_factor(self):
return getattr(settings, "OTP_TOTP_THROTTLE_FACTOR", 1)
@property
def config_url(self):
"""
+4 -4
View File
@@ -63,11 +63,14 @@ class TOTPDeviceMixin:
@override_settings(
OTP_TOTP_SYNC=False,
OTP_TOTP_THROTTLE_FACTOR=0,
)
class TOTPTest(TOTPDeviceMixin, TestCase):
"""TOTP tests"""
def setUp(self):
super().setUp()
self.device.set_throttle_factor(0)
def test_default_key(self):
"""Ensure default_key is valid"""
device = self.alice.totpdevice_set.create()
@@ -190,9 +193,6 @@ class TOTPTest(TOTPDeviceMixin, TestCase):
self.assertEqual(params["image"][0], image_url)
@override_settings(
OTP_TOTP_THROTTLE_FACTOR=1,
)
class ThrottlingTestCase(TOTPDeviceMixin, ThrottlingTestMixin, TestCase):
"""Test TOTP Throttling"""
@@ -39,6 +39,10 @@ class AuthenticatorValidateStageSerializer(StageSerializer):
"webauthn_hints",
"webauthn_allowed_device_types",
"webauthn_allowed_device_types_obj",
"email_otp_throttling_factor",
"sms_otp_throttling_factor",
"totp_otp_throttling_factor",
"static_otp_throttling_factor",
]
@@ -3,6 +3,7 @@
from typing import TYPE_CHECKING
from urllib.parse import urlencode
from django.db import transaction
from django.http import HttpRequest
from django.http.response import Http404
from django.shortcuts import get_object_or_404
@@ -29,8 +30,8 @@ from authentik.flows.stage import StageView
from authentik.lib.utils.email import mask_email
from authentik.lib.utils.time import timedelta_from_string
from authentik.root.middleware import ClientIPMiddleware
from authentik.stages.authenticator import match_token
from authentik.stages.authenticator.models import Device
from authentik.stages.authenticator import devices_for_user
from authentik.stages.authenticator.models import Device, ThrottlingMixin
from authentik.stages.authenticator_duo.models import AuthenticatorDuoStage, DuoDevice
from authentik.stages.authenticator_email.models import EmailDevice
from authentik.stages.authenticator_sms.models import SMSDevice
@@ -143,7 +144,20 @@ def select_challenge_email(request: HttpRequest, device: EmailDevice):
def validate_challenge_code(code: str, stage_view: StageView, user: User) -> Device:
"""Validate code-based challenges. We test against every device, on purpose, as
the user mustn't choose between totp and static devices."""
device = match_token(user, code)
with transaction.atomic():
for device in devices_for_user(user, for_verify=True):
if isinstance(device, ThrottlingMixin):
throttling_factor = stage_view.executor.current_stage.get_throttling_factor(
DeviceClasses.from_model_label(device.model_label())
)
if throttling_factor is not None:
device.set_throttle_factor(throttling_factor)
if device.verify_token(code):
break
else:
device = None
if not device:
login_failed.send(
sender=__name__,
@@ -0,0 +1,36 @@
# Generated by Django 5.2.12 on 2026-04-16 16:33
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
(
"authentik_stages_authenticator_validate",
"0015_authenticatorvalidatestage_webauthn_hints",
),
]
operations = [
migrations.AddField(
model_name="authenticatorvalidatestage",
name="email_otp_throttling_factor",
field=models.FloatField(default=1),
),
migrations.AddField(
model_name="authenticatorvalidatestage",
name="sms_otp_throttling_factor",
field=models.FloatField(default=1),
),
migrations.AddField(
model_name="authenticatorvalidatestage",
name="static_otp_throttling_factor",
field=models.FloatField(default=1),
),
migrations.AddField(
model_name="authenticatorvalidatestage",
name="totp_otp_throttling_factor",
field=models.FloatField(default=1),
),
]
@@ -22,6 +22,12 @@ class DeviceClasses(models.TextChoices):
SMS = "sms", _("SMS")
EMAIL = "email", _("Email")
@staticmethod
def from_model_label(model_label: str) -> DeviceClasses:
return getattr(
DeviceClasses, model_label.rsplit(".", maxsplit=1)[-1][: -len("device")].upper()
)
def default_device_classes() -> list:
"""By default, accept all device classes"""
@@ -82,6 +88,11 @@ class AuthenticatorValidateStage(Stage):
"authentik_stages_authenticator_webauthn.WebAuthnDeviceType", blank=True
)
email_otp_throttling_factor = models.FloatField(default=1)
sms_otp_throttling_factor = models.FloatField(default=1)
totp_otp_throttling_factor = models.FloatField(default=1)
static_otp_throttling_factor = models.FloatField(default=1)
@property
def serializer(self) -> type[BaseSerializer]:
from authentik.stages.authenticator_validate.api import AuthenticatorValidateStageSerializer
@@ -98,6 +109,17 @@ class AuthenticatorValidateStage(Stage):
def component(self) -> str:
return "ak-stage-authenticator-validate-form"
def get_throttling_factor(self, device_class: DeviceClasses) -> float | None:
if device_class == DeviceClasses.EMAIL:
return self.email_otp_throttling_factor
elif device_class == DeviceClasses.SMS:
return self.sms_otp_throttling_factor
elif device_class == DeviceClasses.TOTP:
return self.totp_otp_throttling_factor
elif device_class == DeviceClasses.STATIC:
return self.static_otp_throttling_factor
return None
class Meta:
verbose_name = _("Authenticator Validation Stage")
verbose_name_plural = _("Authenticator Validation Stages")
@@ -0,0 +1,247 @@
from django.test import TestCase
from django.test.client import RequestFactory
from django.urls.base import reverse
from rest_framework.exceptions import ValidationError
from authentik.core.tests.utils import create_test_admin_user, create_test_flow
from authentik.flows.models import FlowStageBinding
from authentik.flows.stage import StageView
from authentik.flows.tests import FlowTestCase
from authentik.flows.views.executor import FlowExecutorView
from authentik.lib.generators import generate_id
from authentik.stages.authenticator_email.models import AuthenticatorEmailStage, EmailDevice
from authentik.stages.authenticator_sms.models import (
AuthenticatorSMSStage,
SMSDevice,
SMSProviders,
)
from authentik.stages.authenticator_validate.challenge import validate_challenge_code
from authentik.stages.authenticator_validate.models import (
AuthenticatorValidateStage,
DeviceClasses,
)
from authentik.stages.identification.models import IdentificationStage, UserFields
class DeviceClassesHelperTests(TestCase):
"""Tests for the DeviceClasses.from_model_label helper."""
def test_from_model_label_all_classes(self):
cases = {
"authentik_stages_authenticator_email.emaildevice": DeviceClasses.EMAIL,
"authentik_stages_authenticator_sms.smsdevice": DeviceClasses.SMS,
"authentik_stages_authenticator_totp.totpdevice": DeviceClasses.TOTP,
"authentik_stages_authenticator_static.staticdevice": DeviceClasses.STATIC,
"authentik_stages_authenticator_duo.duodevice": DeviceClasses.DUO,
"authentik_stages_authenticator_webauthn.webauthndevice": DeviceClasses.WEBAUTHN,
}
for label, expected in cases.items():
with self.subTest(label=label):
self.assertEqual(DeviceClasses.from_model_label(label), expected)
class AuthenticatorValidateStageFactorTests(TestCase):
"""Tests for AuthenticatorValidateStage.get_throttling_factor."""
def test_per_class_factors_returned(self):
stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
email_otp_throttling_factor=5,
sms_otp_throttling_factor=6,
totp_otp_throttling_factor=7,
static_otp_throttling_factor=8,
)
self.assertEqual(stage.get_throttling_factor(DeviceClasses.EMAIL), 5)
self.assertEqual(stage.get_throttling_factor(DeviceClasses.SMS), 6)
self.assertEqual(stage.get_throttling_factor(DeviceClasses.TOTP), 7)
self.assertEqual(stage.get_throttling_factor(DeviceClasses.STATIC), 8)
def test_no_factor_for_webauthn_or_duo(self):
stage = AuthenticatorValidateStage.objects.create(name=generate_id())
self.assertIsNone(stage.get_throttling_factor(DeviceClasses.WEBAUTHN))
self.assertIsNone(stage.get_throttling_factor(DeviceClasses.DUO))
class ValidateChallengeCodeThrottlingTests(FlowTestCase):
"""Tests for validate_challenge_code throttling behavior."""
def setUp(self) -> None:
super().setUp()
self.user = create_test_admin_user()
self.request_factory = RequestFactory()
self.email_stage = AuthenticatorEmailStage.objects.create(
name="email-stage-validate-throttle",
use_global_settings=True,
from_address="test@authentik.local",
token_expiry="minutes=30",
) # nosec
self.sms_stage = AuthenticatorSMSStage.objects.create(
name="sms-stage-validate-throttle",
provider=SMSProviders.GENERIC,
from_number="1234",
)
def _validate_stage(self, **factors) -> AuthenticatorValidateStage:
return AuthenticatorValidateStage.objects.create(
name=generate_id(),
device_classes=[
DeviceClasses.EMAIL,
DeviceClasses.SMS,
DeviceClasses.TOTP,
DeviceClasses.STATIC,
],
**factors,
)
def _stage_view(self, validate_stage: AuthenticatorValidateStage) -> StageView:
request = self.request_factory.get("/")
return StageView(FlowExecutorView(current_stage=validate_stage), request=request)
def _email_device(self, email: str = "throttle@authentik.local") -> EmailDevice:
return EmailDevice.objects.create(
user=self.user,
stage=self.email_stage,
confirmed=True,
email=email,
)
def _sms_device(self, phone_number: str = "+15551230101") -> SMSDevice:
return SMSDevice.objects.create(
user=self.user,
stage=self.sms_stage,
confirmed=True,
phone_number=phone_number,
)
def test_stage_factor_applied_to_email_device(self):
"""The stage's email_otp_throttling_factor is pushed onto the device before verify."""
stage = self._validate_stage(email_otp_throttling_factor=3)
device = self._email_device()
device.generate_token()
with self.assertRaises(ValidationError):
validate_challenge_code("000000", self._stage_view(stage), self.user)
device.refresh_from_db()
self.assertEqual(device.throttling_failure_count, 1)
# verify_is_allowed must compute the delay using factor=3 (3 * 2^0 = 3s).
device.set_throttle_factor(3)
allowed, data = device.verify_is_allowed()
self.assertFalse(allowed)
required = data["locked_until"] - device.throttling_failure_timestamp
self.assertAlmostEqual(required.total_seconds(), 3, places=3)
def test_factor_zero_disables_throttling_end_to_end(self):
"""With email_otp_throttling_factor=0, repeated failures do not lock the device."""
stage = self._validate_stage(email_otp_throttling_factor=0)
device = self._email_device()
device.generate_token()
token = device.token
for _ in range(10):
with self.assertRaises(ValidationError):
validate_challenge_code("000000", self._stage_view(stage), self.user)
matched = validate_challenge_code(token, self._stage_view(stage), self.user)
self.assertEqual(matched.pk, device.pk)
def test_lockout_persists_across_calls(self):
"""
A correct token on the second call is still blocked and does not increment the counter.
"""
stage = self._validate_stage(email_otp_throttling_factor=1)
device = self._email_device()
device.generate_token()
token = device.token
invalid_token = "000000" if token != "000000" else "111111" # nosec
with self.assertRaises(ValidationError):
validate_challenge_code(invalid_token, self._stage_view(stage), self.user)
# Immediately try with the correct token: lockout still active, attempt must be rejected.
with self.assertRaises(ValidationError):
validate_challenge_code(token, self._stage_view(stage), self.user)
device.refresh_from_db()
# Token wasn't consumed (verification never ran), and counter didn't get incremented.
self.assertEqual(device.token, token)
self.assertEqual(device.throttling_failure_count, 1)
class ValidateStageThrottlingFlowTests(FlowTestCase):
"""End-to-end lockout behavior through the flow executor HTTP API."""
def setUp(self) -> None:
super().setUp()
self.user = create_test_admin_user()
self.email_stage = AuthenticatorEmailStage.objects.create(
name="email-stage-flow-throttle",
use_global_settings=True,
from_address="test@authentik.local",
token_expiry="minutes=30",
) # nosec
self.ident_stage = IdentificationStage.objects.create(
name=generate_id(),
user_fields=[UserFields.USERNAME],
)
self.validate_stage = AuthenticatorValidateStage.objects.create(
name=generate_id(),
device_classes=[DeviceClasses.EMAIL],
email_otp_throttling_factor=1,
)
self.flow = create_test_flow()
FlowStageBinding.objects.create(target=self.flow, stage=self.ident_stage, order=0)
FlowStageBinding.objects.create(target=self.flow, stage=self.validate_stage, order=1)
def _identify(self):
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{"uid_field": self.user.username},
follow=True,
)
self.assertEqual(response.status_code, 200)
def _select_email(self, device: EmailDevice):
self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{
"component": "ak-stage-authenticator-validate",
"selected_challenge": {
"device_class": "email",
"device_uid": str(device.pk),
"challenge": {},
"last_used": None,
},
},
)
def test_bad_code_then_correct_code_is_still_blocked(self):
"""After a bad code over HTTP, a subsequent correct code is still rejected
because the lockout persists in the database."""
device = EmailDevice.objects.create(
user=self.user,
confirmed=True,
stage=self.email_stage,
email="throttle-flow@authentik.local",
)
self._identify()
self._select_email(device)
# Server generated and stored the token - grab it from DB.
device.refresh_from_db()
token = device.token
# First attempt: bad code - must increment the DB counter.
self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{"component": "ak-stage-authenticator-validate", "code": "000000"},
)
device.refresh_from_db()
self.assertEqual(device.throttling_failure_count, 1)
self.assertEqual(device.token, token)
# Second attempt with the correct token - still blocked.
response = self.client.post(
reverse("authentik_api:flow-executor", kwargs={"flow_slug": self.flow.slug}),
{"component": "ak-stage-authenticator-validate", "code": token},
)
self.assertStageResponse(
response,
flow=self.flow,
component="ak-stage-authenticator-validate",
)
device.refresh_from_db()
# Counter wasn't incremented on a blocked attempt
self.assertEqual(device.throttling_failure_count, 1)
# Token wasn't consumed.
self.assertEqual(device.token, token)
+16
View File
@@ -14936,6 +14936,22 @@
"format": "uuid"
},
"title": "Webauthn allowed device types"
},
"email_otp_throttling_factor": {
"type": "number",
"title": "Email otp throttling factor"
},
"sms_otp_throttling_factor": {
"type": "number",
"title": "Sms otp throttling factor"
},
"totp_otp_throttling_factor": {
"type": "number",
"title": "Totp otp throttling factor"
},
"static_otp_throttling_factor": {
"type": "number",
"title": "Static otp throttling factor"
}
},
"required": []
@@ -124,6 +124,30 @@ export interface AuthenticatorValidateStage {
* @memberof AuthenticatorValidateStage
*/
readonly webauthnAllowedDeviceTypesObj: Array<WebAuthnDeviceType>;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStage
*/
emailOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStage
*/
smsOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStage
*/
totpOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStage
*/
staticOtpThrottlingFactor?: number;
}
/**
@@ -193,6 +217,22 @@ export function AuthenticatorValidateStageFromJSONTyped(
webauthnAllowedDeviceTypesObj: (
json["webauthn_allowed_device_types_obj"] as Array<any>
).map(WebAuthnDeviceTypeFromJSON),
emailOtpThrottlingFactor:
json["email_otp_throttling_factor"] == null
? undefined
: json["email_otp_throttling_factor"],
smsOtpThrottlingFactor:
json["sms_otp_throttling_factor"] == null
? undefined
: json["sms_otp_throttling_factor"],
totpOtpThrottlingFactor:
json["totp_otp_throttling_factor"] == null
? undefined
: json["totp_otp_throttling_factor"],
staticOtpThrottlingFactor:
json["static_otp_throttling_factor"] == null
? undefined
: json["static_otp_throttling_factor"],
};
}
@@ -232,5 +272,9 @@ export function AuthenticatorValidateStageToJSONTyped(
? undefined
: (value["webauthnHints"] as Array<any>).map(WebAuthnHintEnumToJSON),
webauthn_allowed_device_types: value["webauthnAllowedDeviceTypes"],
email_otp_throttling_factor: value["emailOtpThrottlingFactor"],
sms_otp_throttling_factor: value["smsOtpThrottlingFactor"],
totp_otp_throttling_factor: value["totpOtpThrottlingFactor"],
static_otp_throttling_factor: value["staticOtpThrottlingFactor"],
};
}
@@ -78,6 +78,30 @@ export interface AuthenticatorValidateStageRequest {
* @memberof AuthenticatorValidateStageRequest
*/
webauthnAllowedDeviceTypes?: Array<string>;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStageRequest
*/
emailOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStageRequest
*/
smsOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStageRequest
*/
totpOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof AuthenticatorValidateStageRequest
*/
staticOtpThrottlingFactor?: number;
}
/**
@@ -129,6 +153,22 @@ export function AuthenticatorValidateStageRequestFromJSONTyped(
json["webauthn_allowed_device_types"] == null
? undefined
: json["webauthn_allowed_device_types"],
emailOtpThrottlingFactor:
json["email_otp_throttling_factor"] == null
? undefined
: json["email_otp_throttling_factor"],
smsOtpThrottlingFactor:
json["sms_otp_throttling_factor"] == null
? undefined
: json["sms_otp_throttling_factor"],
totpOtpThrottlingFactor:
json["totp_otp_throttling_factor"] == null
? undefined
: json["totp_otp_throttling_factor"],
staticOtpThrottlingFactor:
json["static_otp_throttling_factor"] == null
? undefined
: json["static_otp_throttling_factor"],
};
}
@@ -161,5 +201,9 @@ export function AuthenticatorValidateStageRequestToJSONTyped(
? undefined
: (value["webauthnHints"] as Array<any>).map(WebAuthnHintEnumToJSON),
webauthn_allowed_device_types: value["webauthnAllowedDeviceTypes"],
email_otp_throttling_factor: value["emailOtpThrottlingFactor"],
sms_otp_throttling_factor: value["smsOtpThrottlingFactor"],
totp_otp_throttling_factor: value["totpOtpThrottlingFactor"],
static_otp_throttling_factor: value["staticOtpThrottlingFactor"],
};
}
@@ -78,6 +78,30 @@ export interface PatchedAuthenticatorValidateStageRequest {
* @memberof PatchedAuthenticatorValidateStageRequest
*/
webauthnAllowedDeviceTypes?: Array<string>;
/**
*
* @type {number}
* @memberof PatchedAuthenticatorValidateStageRequest
*/
emailOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof PatchedAuthenticatorValidateStageRequest
*/
smsOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof PatchedAuthenticatorValidateStageRequest
*/
totpOtpThrottlingFactor?: number;
/**
*
* @type {number}
* @memberof PatchedAuthenticatorValidateStageRequest
*/
staticOtpThrottlingFactor?: number;
}
/**
@@ -128,6 +152,22 @@ export function PatchedAuthenticatorValidateStageRequestFromJSONTyped(
json["webauthn_allowed_device_types"] == null
? undefined
: json["webauthn_allowed_device_types"],
emailOtpThrottlingFactor:
json["email_otp_throttling_factor"] == null
? undefined
: json["email_otp_throttling_factor"],
smsOtpThrottlingFactor:
json["sms_otp_throttling_factor"] == null
? undefined
: json["sms_otp_throttling_factor"],
totpOtpThrottlingFactor:
json["totp_otp_throttling_factor"] == null
? undefined
: json["totp_otp_throttling_factor"],
staticOtpThrottlingFactor:
json["static_otp_throttling_factor"] == null
? undefined
: json["static_otp_throttling_factor"],
};
}
@@ -160,5 +200,9 @@ export function PatchedAuthenticatorValidateStageRequestToJSONTyped(
? undefined
: (value["webauthnHints"] as Array<any>).map(WebAuthnHintEnumToJSON),
webauthn_allowed_device_types: value["webauthnAllowedDeviceTypes"],
email_otp_throttling_factor: value["emailOtpThrottlingFactor"],
sms_otp_throttling_factor: value["smsOtpThrottlingFactor"],
totp_otp_throttling_factor: value["totpOtpThrottlingFactor"],
static_otp_throttling_factor: value["staticOtpThrottlingFactor"],
};
}
+36
View File
@@ -35613,6 +35613,18 @@ components:
items:
$ref: '#/components/schemas/WebAuthnDeviceType'
readOnly: true
email_otp_throttling_factor:
type: number
format: double
sms_otp_throttling_factor:
type: number
format: double
totp_otp_throttling_factor:
type: number
format: double
static_otp_throttling_factor:
type: number
format: double
required:
- component
- flow_set
@@ -35662,6 +35674,18 @@ components:
items:
type: string
format: uuid
email_otp_throttling_factor:
type: number
format: double
sms_otp_throttling_factor:
type: number
format: double
totp_otp_throttling_factor:
type: number
format: double
static_otp_throttling_factor:
type: number
format: double
required:
- name
AuthenticatorValidationChallenge:
@@ -48095,6 +48119,18 @@ components:
items:
type: string
format: uuid
email_otp_throttling_factor:
type: number
format: double
sms_otp_throttling_factor:
type: number
format: double
totp_otp_throttling_factor:
type: number
format: double
static_otp_throttling_factor:
type: number
format: double
PatchedAuthenticatorWebAuthnStageRequest:
type: object
description: AuthenticatorWebAuthnStage Serializer
@@ -232,6 +232,70 @@ export class AuthenticatorValidateStageForm extends BaseStageForm<AuthenticatorV
: nothing}
</div>
</ak-form-group>
<ak-form-group label="${msg("Throttling settings")}">
<ak-form-element-horizontal
label=${msg("Email OTP throttling factor")}
required
name="emailOtpThrottlingFactor"
>
<input
type="number"
step="0.1"
value=${this.instance?.emailOtpThrottlingFactor || 1}
class="pf-c-form-control pf-m-monospace"
autocomplete="off"
spellcheck="false"
required
/>
</ak-form-element-horizontal>
<ak-form-element-horizontal
label=${msg("SMS OTP throttling factor")}
required
name="smsOtpThrottlingFactor"
>
<input
type="number"
step="0.1"
value=${this.instance?.smsOtpThrottlingFactor || 1}
class="pf-c-form-control pf-m-monospace"
autocomplete="off"
spellcheck="false"
required
/>
</ak-form-element-horizontal>
<ak-form-element-horizontal
label=${msg("TOTP throttling factor")}
required
name="totpOtpThrottlingFactor"
>
<input
type="number"
step="0.1"
value=${this.instance?.totpOtpThrottlingFactor || 1}
class="pf-c-form-control pf-m-monospace"
autocomplete="off"
spellcheck="false"
required
/>
</ak-form-element-horizontal>
<ak-form-element-horizontal
label=${msg("Static OTP throttling factor")}
required
name="staticOtpThrottlingFactor"
>
<input
type="number"
step="0.1"
value=${this.instance?.staticOtpThrottlingFactor || 1}
class="pf-c-form-control pf-m-monospace"
autocomplete="off"
spellcheck="false"
required
/>
</ak-form-element-horizontal>
</ak-form-group>
<ak-form-group open label="${msg("WebAuthn-specific settings")}">
<div class="pf-c-form">
<ak-form-element-horizontal
@@ -152,3 +152,26 @@ These restrictions only apply to WebAuthn devices created with authentik 2024.4
#### Automatic device selection
If the user has more than one device, the user is prompted to select which device they want to use for validation. After the user successfully authenticates with a certain device, that device is marked as "last used". In subsequent prompts by the Authenticator validation stage, the last used device is automatically selected for the user. Should they wish to use another device, the user can return to the device selection screen.
#### Throttling
To slow down brute-force attacks against code-based authentication methods, the stage applies an exponential back-off to each device after a failed verification attempt. The delay required between verification attempts grows with each successive failure, as:
```
delay_seconds = factor * 2^(n - 1)
```
where `factor` is the per-device-class throttling factor configured on the stage and `n` is the number of successive failures on that device.
For example, with the default factor of `1`, the required delay between attempts is `1, 2, 4, 8, 16, ...` seconds. With a factor of `0.5`, it is `0.5, 1, 2, 4, ...` seconds. A successful verification resets the counter.
A factor of `0` disables throttling entirely for that device class.
The following fields are available:
- **Email OTP throttling factor**: applies to Email devices.
- **SMS OTP throttling factor**: applies to SMS devices.
- **TOTP throttling factor**: applies to TOTP devices.
- **Static OTP throttling factor**: applies to static recovery codes.
WebAuthn and Duo devices are not throttled.