sources/sync: configuration for outgoing sync trigger mode (#17669)

* sources/sync: configuration for outgoing sync trigger mode

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>

* lint

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>

* api and frontend

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>

* fix tests

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>

* update migrations

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>

* Wrap `msg` calls in function to fix translation. Update props to accept
callbacks.

---------

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
Co-authored-by: Teffen Ellis <teffen@goauthentik.io>
This commit is contained in:
Marc 'risson' Schmitt
2025-12-10 16:40:32 +01:00
committed by GitHub
parent b4b89e9633
commit 92c5efbac1
20 changed files with 280 additions and 17 deletions
+1 -1
View File
@@ -152,7 +152,7 @@ class AttributesMixin(models.Model):
@classmethod
def update_or_create_attributes(
cls, query: dict[str, Any], properties: dict[str, Any]
) -> tuple[models.Model, bool]:
) -> tuple[Self, bool]:
"""Same as django's update_or_create but correctly updates attributes by merging dicts"""
instance = cls.objects.filter(**query).first()
if not instance:
+1 -1
View File
@@ -39,7 +39,7 @@ def source_tester_factory(test_model: type[Source]) -> Callable:
def tester(self: TestModels):
model_class = None
if test_model._meta.abstract:
model_class = [x for x in test_model.__bases__ if issubclass(x, Source)][0]()
return
else:
model_class = test_model()
model_class.slug = "test"
+25
View File
@@ -0,0 +1,25 @@
from django.db import models
from django.utils.translation import gettext_lazy as _
from authentik.core.models import Source
from authentik.tasks.schedules.models import ScheduledModel
class SyncOutgoingTriggerMode(models.TextChoices):
# Do not trigger outgoing syncs
NONE = "none"
# Trigger immediately after object changed
IMMEDIATE = "immediate"
# Trigger at the end of full sync
DEFERRED_END = "deferred_end"
class IncomingSyncSource(ScheduledModel, Source):
sync_outgoing_trigger_mode = models.TextField(
choices=SyncOutgoingTriggerMode.choices,
default=SyncOutgoingTriggerMode.DEFERRED_END,
help_text=_("When to trigger sync for outgoing providers"),
)
class Meta:
abstract = True
+4
View File
@@ -83,6 +83,10 @@ class OutgoingSyncProvider(ScheduledModel, Model):
def sync_actor(self) -> Actor:
raise NotImplementedError
def sync_dispatch(self) -> None:
for schedule in self.schedules:
schedule.send()
@property
def schedule_specs(self) -> list[ScheduleSpec]:
return [
+26
View File
@@ -1,3 +1,6 @@
from contextlib import contextmanager
from contextvars import ContextVar
from django.db.models import Model
from django.db.models.signals import m2m_changed, post_save, pre_delete
from dramatiq.actor import Actor
@@ -7,6 +10,23 @@ from authentik.lib.sync.outgoing.base import Direction
from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
from authentik.lib.utils.reflection import class_to_path
_CTX_INHIBIT_DISPATCH = ContextVar[bool](
"authentik_sync_outgoing_inhibit_dispatch",
default=False,
)
@contextmanager
def sync_outgoing_inhibit_dispatch():
"""
Prevent direct and m2m tasks from being dispatched when User/Group/membership change
"""
_CTX_INHIBIT_DISPATCH.set(True)
try:
yield
finally:
_CTX_INHIBIT_DISPATCH.set(False)
def register_signals(
provider_type: type[OutgoingSyncProvider],
@@ -28,6 +48,8 @@ def register_signals(
# This primarily happens during user login
if sender == User and update_fields == {"last_login"}:
return
if _CTX_INHIBIT_DISPATCH.get():
return
if not provider_type.objects.exists():
return
task_sync_direct_dispatch.send(
@@ -41,6 +63,8 @@ def register_signals(
def model_pre_delete(sender: type[Model], instance: User | Group, **_):
"""Pre-delete handler"""
if _CTX_INHIBIT_DISPATCH.get():
return
if not provider_type.objects.exists():
return
task_sync_direct_dispatch.send(
@@ -58,6 +82,8 @@ def register_signals(
"""Sync group membership"""
if action not in ["post_add", "post_remove"]:
return
if _CTX_INHIBIT_DISPATCH.get():
return
if not provider_type.objects.exists():
return
task_sync_m2m_dispatch.send(instance.pk, action, list(pk_set), reverse)
+1
View File
@@ -44,6 +44,7 @@ class KerberosSourceSerializer(SourceSerializer):
"spnego_keytab",
"spnego_ccache",
"password_login_update_internal_password",
"sync_outgoing_trigger_mode",
]
extra_kwargs = {
"sync_password": {"write_only": True},
@@ -0,0 +1,26 @@
# Generated by Django 5.2.9 on 2025-12-08 13:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_sources_kerberos", "0003_migrate_userkerberossourceconnection_identifier"),
]
operations = [
migrations.AddField(
model_name="kerberossource",
name="sync_outgoing_trigger_mode",
field=models.TextField(
choices=[
("none", "None"),
("immediate", "Immediate"),
("deferred_end", "Deferred End"),
],
default="deferred_end",
help_text="When to trigger sync for outgoing providers",
),
),
]
+2 -3
View File
@@ -22,15 +22,14 @@ from structlog.stdlib import get_logger
from authentik.core.models import (
GroupSourceConnection,
PropertyMapping,
Source,
UserSourceConnection,
UserTypes,
)
from authentik.core.types import UILoginButton, UserSettingSerializer
from authentik.flows.challenge import RedirectChallenge
from authentik.lib.sync.incoming.models import IncomingSyncSource
from authentik.lib.utils.time import fqdn_rand
from authentik.tasks.schedules.common import ScheduleSpec
from authentik.tasks.schedules.models import ScheduledModel
LOGGER = get_logger()
@@ -46,7 +45,7 @@ class KAdminType(models.TextChoices):
OTHER = "other"
class KerberosSource(ScheduledModel, Source):
class KerberosSource(IncomingSyncSource):
"""Federate Kerberos realm with authentik"""
realm = models.TextField(help_text=_("Kerberos realm"), unique=True)
+13 -1
View File
@@ -6,7 +6,11 @@ from dramatiq.actor import actor
from structlog.stdlib import get_logger
from authentik.lib.config import CONFIG
from authentik.lib.sync.incoming.models import SyncOutgoingTriggerMode
from authentik.lib.sync.outgoing.exceptions import StopSync
from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch
from authentik.lib.utils.reflection import all_subclasses
from authentik.sources.kerberos.models import KerberosSource
from authentik.sources.kerberos.sync import KerberosSync
from authentik.tasks.middleware import CurrentTask
@@ -45,7 +49,15 @@ def kerberos_sync(pk: str):
)
return
syncer = KerberosSync(source, self)
syncer.sync()
if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.IMMEDIATE:
syncer.sync()
else:
with sync_outgoing_inhibit_dispatch():
syncer.sync()
if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.DEFERRED_END:
for outgoing_sync_provider_cls in all_subclasses(OutgoingSyncProvider):
for provider in outgoing_sync_provider_cls.objects.all():
provider.sync_dispatch()
except StopSync as exc:
LOGGER.warning("Error syncing kerberos", exc=exc, source=source)
self.error(exc)
+1
View File
@@ -114,6 +114,7 @@ class LDAPSourceSerializer(SourceSerializer):
"connectivity",
"lookup_groups_from_user",
"delete_not_found_objects",
"sync_outgoing_trigger_mode",
]
extra_kwargs = {"bind_password": {"write_only": True}}
@@ -0,0 +1,26 @@
# Generated by Django 5.2.9 on 2025-12-08 13:45
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
("authentik_sources_ldap", "0010_ldapsource_user_membership_attribute"),
]
operations = [
migrations.AddField(
model_name="ldapsource",
name="sync_outgoing_trigger_mode",
field=models.TextField(
choices=[
("none", "None"),
("immediate", "Immediate"),
("deferred_end", "Deferred End"),
],
default="deferred_end",
help_text="When to trigger sync for outgoing providers",
),
),
]
+2 -3
View File
@@ -19,15 +19,14 @@ from authentik.core.models import (
Group,
GroupSourceConnection,
PropertyMapping,
Source,
UserSourceConnection,
)
from authentik.crypto.models import CertificateKeyPair
from authentik.lib.config import CONFIG
from authentik.lib.models import DomainlessURLValidator
from authentik.lib.sync.incoming.models import IncomingSyncSource
from authentik.lib.utils.time import fqdn_rand
from authentik.tasks.schedules.common import ScheduleSpec
from authentik.tasks.schedules.models import ScheduledModel
LDAP_TIMEOUT = 15
LDAP_UNIQUENESS = "ldap_uniq"
@@ -56,7 +55,7 @@ class MultiURLValidator(DomainlessURLValidator):
super().__call__(value)
class LDAPSource(ScheduledModel, Source):
class LDAPSource(IncomingSyncSource):
"""Federate LDAP Directory with authentik, or create new accounts in LDAP."""
server_uri = models.TextField(
+14 -2
View File
@@ -11,8 +11,11 @@ from ldap3.core.exceptions import LDAPException
from structlog.stdlib import get_logger
from authentik.lib.config import CONFIG
from authentik.lib.sync.incoming.models import SyncOutgoingTriggerMode
from authentik.lib.sync.outgoing.exceptions import StopSync
from authentik.lib.utils.reflection import class_to_path, path_to_class
from authentik.lib.sync.outgoing.models import OutgoingSyncProvider
from authentik.lib.sync.outgoing.signals import sync_outgoing_inhibit_dispatch
from authentik.lib.utils.reflection import all_subclasses, class_to_path, path_to_class
from authentik.sources.ldap.models import LDAPSource
from authentik.sources.ldap.sync.base import BaseLDAPSynchronizer
from authentik.sources.ldap.sync.forward_delete_groups import GroupLDAPForwardDeletion
@@ -102,6 +105,11 @@ def ldap_sync(source_pk: str):
timeout=60 * 60 * CONFIG.get_int("ldap.task_timeout_hours") * 1000,
)
if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.DEFERRED_END:
for outgoing_sync_provider_cls in all_subclasses(OutgoingSyncProvider):
for provider in outgoing_sync_provider_cls.objects.all():
provider.sync_dispatch()
def ldap_sync_paginator(
task: Task, source: LDAPSource, sync: type[BaseLDAPSynchronizer]
@@ -147,7 +155,11 @@ def ldap_sync_page(source_pk: str, sync_class: str, page_cache_key: str):
self.error(error_message)
return
cache.touch(page_cache_key)
count = sync_inst.sync(page)
if source.sync_outgoing_trigger_mode == SyncOutgoingTriggerMode.IMMEDIATE:
count = sync_inst.sync(page)
else:
with sync_outgoing_inhibit_dispatch():
count = sync_inst.sync(page)
self.info(f"Synced {count} objects.")
cache.delete(page_cache_key)
except (LDAPException, StopSync) as exc:
+20
View File
@@ -11334,6 +11334,16 @@
"type": "boolean",
"title": "Password login update internal password",
"description": "If enabled, the authentik-stored password will be updated upon login with the Kerberos password backend"
},
"sync_outgoing_trigger_mode": {
"type": "string",
"enum": [
"none",
"immediate",
"deferred_end"
],
"title": "Sync outgoing trigger mode",
"description": "When to trigger sync for outgoing providers"
}
},
"required": []
@@ -11699,6 +11709,16 @@
"type": "boolean",
"title": "Delete not found objects",
"description": "Delete authentik users and groups which were previously supplied by this source, but are now missing from it."
},
"sync_outgoing_trigger_mode": {
"type": "string",
"enum": [
"none",
"immediate",
"deferred_end"
],
"title": "Sync outgoing trigger mode",
"description": "When to trigger sync for outgoing providers"
}
},
"required": []
+30
View File
@@ -39803,6 +39803,10 @@ components:
type: boolean
description: If enabled, the authentik-stored password will be updated upon
login with the Kerberos password backend
sync_outgoing_trigger_mode:
allOf:
- $ref: '#/components/schemas/SyncOutgoingTriggerModeEnum'
description: When to trigger sync for outgoing providers
required:
- component
- connectivity
@@ -39987,6 +39991,10 @@ components:
type: boolean
description: If enabled, the authentik-stored password will be updated upon
login with the Kerberos password backend
sync_outgoing_trigger_mode:
allOf:
- $ref: '#/components/schemas/SyncOutgoingTriggerModeEnum'
description: When to trigger sync for outgoing providers
required:
- name
- realm
@@ -40511,6 +40519,10 @@ components:
type: boolean
description: Delete authentik users and groups which were previously supplied
by this source, but are now missing from it.
sync_outgoing_trigger_mode:
allOf:
- $ref: '#/components/schemas/SyncOutgoingTriggerModeEnum'
description: When to trigger sync for outgoing providers
required:
- base_dn
- component
@@ -40725,6 +40737,10 @@ components:
type: boolean
description: Delete authentik users and groups which were previously supplied
by this source, but are now missing from it.
sync_outgoing_trigger_mode:
allOf:
- $ref: '#/components/schemas/SyncOutgoingTriggerModeEnum'
description: When to trigger sync for outgoing providers
required:
- base_dn
- name
@@ -46867,6 +46883,10 @@ components:
type: boolean
description: If enabled, the authentik-stored password will be updated upon
login with the Kerberos password backend
sync_outgoing_trigger_mode:
allOf:
- $ref: '#/components/schemas/SyncOutgoingTriggerModeEnum'
description: When to trigger sync for outgoing providers
PatchedKubernetesServiceConnectionRequest:
type: object
description: KubernetesServiceConnection Serializer
@@ -47101,6 +47121,10 @@ components:
type: boolean
description: Delete authentik users and groups which were previously supplied
by this source, but are now missing from it.
sync_outgoing_trigger_mode:
allOf:
- $ref: '#/components/schemas/SyncOutgoingTriggerModeEnum'
description: When to trigger sync for outgoing providers
PatchedLicenseRequest:
type: object
description: License Serializer
@@ -53702,6 +53726,12 @@ components:
readOnly: true
required:
- messages
SyncOutgoingTriggerModeEnum:
enum:
- none
- immediate
- deferred_end
type: string
SyncStatus:
type: object
description: Provider/source sync status
@@ -2,6 +2,7 @@ import "#admin/common/ak-flow-search/ak-source-flow-search";
import "#components/ak-secret-text-input";
import "#components/ak-secret-textarea-input";
import "#components/ak-slug-input";
import "#components/ak-radio-input";
import "#components/ak-file-search-input";
import "#components/ak-switch-input";
import "#components/ak-text-input";
@@ -15,6 +16,8 @@ import { propertyMappingsProvider, propertyMappingsSelector } from "./KerberosSo
import { DEFAULT_CONFIG } from "#common/api/config";
import { RadioOption } from "#elements/forms/Radio";
import { iconHelperText, placeholderHelperText } from "#admin/helperText";
import { BaseSourceForm } from "#admin/sources/BaseSourceForm";
import { GroupMatchingModeToLabel, UserMatchingModeToLabel } from "#admin/sources/oauth/utils";
@@ -27,6 +30,7 @@ import {
KerberosSource,
KerberosSourceRequest,
SourcesApi,
SyncOutgoingTriggerModeEnum,
UserMatchingModeEnum,
} from "@goauthentik/api";
@@ -35,6 +39,31 @@ import { html, TemplateResult } from "lit";
import { customElement } from "lit/decorators.js";
import { ifDefined } from "lit/directives/if-defined.js";
function createSyncOutgoingTriggerModeOptions(): RadioOption<SyncOutgoingTriggerModeEnum>[] {
return [
{
label: msg("None"),
value: SyncOutgoingTriggerModeEnum.None,
description: html`${msg("Outgoing syncs will not be triggered.")}`,
},
{
label: msg("Immediate"),
value: SyncOutgoingTriggerModeEnum.Immediate,
description: html`${msg(
"Outgoing syncs will be triggered immediately for each object that is updated. This can create many background tasks and is therefore not recommended",
)}`,
},
{
label: msg("Deferred until end"),
value: SyncOutgoingTriggerModeEnum.DeferredEnd,
default: true,
description: html`${msg(
"Outgoing syncs will be triggered at the end of the source synchronization.",
)}`,
},
];
}
@customElement("ak-source-kerberos-form")
export class KerberosSourceForm extends BaseSourceForm<KerberosSource> {
async loadInstance(pk: string): Promise<KerberosSource> {
@@ -365,6 +394,14 @@ export class KerberosSourceForm extends BaseSourceForm<KerberosSource> {
help=${placeholderHelperText}
></ak-text-input>
</div>
<ak-radio-input
label=${msg("Outgoing sync trigger mode")}
required
name="type"
.value=${this.instance?.syncOutgoingTriggerMode}
.options=${createSyncOutgoingTriggerModeOptions}
>
</ak-radio-input>
<ak-file-search-input
name="icon"
label=${msg("Icon")}
@@ -1,6 +1,7 @@
import "#admin/common/ak-crypto-certificate-search";
import "#components/ak-secret-text-input";
import "#components/ak-slug-input";
import "#components/ak-radio-input";
import "#elements/ak-dual-select/ak-dual-select-dynamic-selected-provider";
import "#elements/forms/FormGroup";
import "#elements/forms/HorizontalFormElement";
@@ -10,6 +11,8 @@ import { propertyMappingsProvider, propertyMappingsSelector } from "./LDAPSource
import { DEFAULT_CONFIG } from "#common/api/config";
import { RadioOption } from "#elements/forms/Radio";
import { placeholderHelperText } from "#admin/helperText";
import { BaseSourceForm } from "#admin/sources/BaseSourceForm";
@@ -20,6 +23,7 @@ import {
LDAPSource,
LDAPSourceRequest,
SourcesApi,
SyncOutgoingTriggerModeEnum,
} from "@goauthentik/api";
import { msg } from "@lit/localize";
@@ -27,6 +31,30 @@ import { html, TemplateResult } from "lit";
import { customElement } from "lit/decorators.js";
import { ifDefined } from "lit/directives/if-defined.js";
function createSyncOutgoingTriggerModeOptions(): RadioOption<SyncOutgoingTriggerModeEnum>[] {
return [
{
label: msg("None"),
value: SyncOutgoingTriggerModeEnum.None,
description: html`${msg("Outgoing syncs will not be triggered.")}`,
},
{
label: msg("Immediate"),
value: SyncOutgoingTriggerModeEnum.Immediate,
description: html`${msg(
"Outgoing syncs will be triggered immediately for each object that is updated. This can create many background tasks and is therefore not recommended",
)}`,
},
{
label: msg("Deferred until end"),
value: SyncOutgoingTriggerModeEnum.DeferredEnd,
default: true,
description: html`${msg(
"Outgoing syncs will be triggered at the end of the source synchronization.",
)}`,
},
];
}
@customElement("ak-source-ldap-form")
export class LDAPSourceForm extends BaseSourceForm<LDAPSource> {
loadInstance(pk: string): Promise<LDAPSource> {
@@ -481,6 +509,14 @@ export class LDAPSourceForm extends BaseSourceForm<LDAPSource> {
${msg("Field which contains a unique Identifier.")}
</p>
</ak-form-element-horizontal>
<ak-radio-input
label=${msg("Outgoing sync trigger mode")}
required
name="type"
.value=${this.instance?.syncOutgoingTriggerMode}
.options=${createSyncOutgoingTriggerModeOptions}
>
</ak-radio-input>
</div>
</ak-form-group>`;
}
+3 -3
View File
@@ -13,10 +13,10 @@ export class AkRadioInput<T> extends HorizontalLightComponent<T> {
public override role = "radiogroup";
@property({ type: Object })
value!: T;
public value!: T;
@property({ type: Array })
options: RadioOption<T>[] = [];
@property({ attribute: false })
public options: RadioOption<T>[] | (() => RadioOption<T>[]) = [];
handleInput(ev: CustomEvent) {
if ("detail" in ev) {
+12 -3
View File
@@ -23,8 +23,13 @@ export interface RadioOption<T> {
@customElement("ak-radio")
export class Radio<T> extends CustomEmitterElement(AKElement) {
/**
* Options to display in the radio group.
*
* Can be either an array of RadioOption<T> or a function returning such an array.
*/
@property({ attribute: false })
public options: RadioOption<T>[] = [];
public options: RadioOption<T>[] | (() => RadioOption<T>[]) = [];
@property()
public name = "";
@@ -42,11 +47,15 @@ export class Radio<T> extends CustomEmitterElement(AKElement) {
Styles,
];
#optionsArray(): RadioOption<T>[] {
return typeof this.options === "function" ? this.options() : this.options;
}
// Set the value if it's not set already. Property changes inside the `willUpdate()` method do
// not trigger an element update.
willUpdate() {
if (!this.value) {
const maybeDefault = this.options.filter((opt) => opt.default);
const maybeDefault = this.#optionsArray().filter((opt) => opt.default);
if (maybeDefault.length > 0) {
this.value = maybeDefault[0].value;
}
@@ -103,7 +112,7 @@ export class Radio<T> extends CustomEmitterElement(AKElement) {
render() {
return html`<div class="pf-c-form__group-control pf-m-stack">
${map(this.options, this.#renderRadio)}
${map(this.#optionsArray(), this.#renderRadio)}
</div>`;
}
}