From b32df1751348f709e8120c66a11be69f3d4dd99e Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Wed, 6 May 2026 14:42:29 +0000 Subject: [PATCH] core: bump dramatiq from 1.17.1 to 2.1.0 (#22076) Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Marc 'risson' Schmitt --- authentik/tasks/test.py | 21 +++---- lifecycle/worker_process.py | 8 +-- .../django_dramatiq_postgres/apps.py | 15 ++--- .../django_dramatiq_postgres/broker.py | 55 +++++++++---------- .../django_dramatiq_postgres/forks.py | 3 +- .../django_dramatiq_postgres/middleware.py | 42 +++++++------- .../django_dramatiq_postgres/models.py | 2 +- .../django-dramatiq-postgres/pyproject.toml | 2 +- uv.lock | 11 ++-- 9 files changed, 75 insertions(+), 84 deletions(-) diff --git a/authentik/tasks/test.py b/authentik/tasks/test.py index f538b9dca4..7ee8f15797 100644 --- a/authentik/tasks/test.py +++ b/authentik/tasks/test.py @@ -7,7 +7,7 @@ from dramatiq.broker import Broker, MessageProxy, get_broker from dramatiq.middleware.middleware import Middleware from dramatiq.middleware.retries import Retries from dramatiq.results.middleware import Results -from dramatiq.worker import Worker, _ConsumerThread, _WorkerThread +from dramatiq.worker import ConsumerThread, Worker, WorkerThread from authentik.tasks.broker import PostgresBroker @@ -20,7 +20,7 @@ class TestWorker(Worker): self.worker_id = 1000 self.work_queue = PriorityQueue() self.consumers = { - TESTING_QUEUE: _ConsumerThread( + TESTING_QUEUE: ConsumerThread( broker=self.broker, queue_name=TESTING_QUEUE, prefetch=2, @@ -33,7 +33,7 @@ class TestWorker(Worker): prefetch=2, timeout=1, ) - self._worker = _WorkerThread( + self._worker = WorkerThread( broker=self.broker, consumers=self.consumers, work_queue=self.work_queue, @@ -78,17 +78,18 @@ def use_test_broker(): actor.broker = broker actor.broker.declare_actor(actor) - for middleware_class, middleware_kwargs in Conf().middlewares: - middleware: Middleware = import_string(middleware_class)( + for middleware_class_path, middleware_kwargs in Conf().middlewares: + middleware_class = import_string(middleware_class_path) + if issubclass(middleware_class, Results): + middleware_kwargs["backend"] = import_string(Conf().result_backend)( + *Conf().result_backend_args, + **Conf().result_backend_kwargs, + ) + middleware: Middleware = middleware_class( **middleware_kwargs, ) if isinstance(middleware, Retries): middleware.max_retries = 0 - if isinstance(middleware, Results): - middleware.backend = import_string(Conf().result_backend)( - *Conf().result_backend_args, - **Conf().result_backend_kwargs, - ) broker.add_middleware(middleware) broker.start() diff --git a/lifecycle/worker_process.py b/lifecycle/worker_process.py index c37f012f92..ac6110e92c 100755 --- a/lifecycle/worker_process.py +++ b/lifecycle/worker_process.py @@ -28,12 +28,7 @@ class HttpHandler(BaseHTTPRequestHandler): _ = db_conn.cursor() def do_GET(self): - from django.db import ( - DatabaseError, - InterfaceError, - OperationalError, - connections, - ) + from django.db import DatabaseError, InterfaceError, OperationalError, connections from psycopg.errors import AdminShutdown from authentik.root.monitoring import monitoring_set @@ -42,7 +37,6 @@ class HttpHandler(BaseHTTPRequestHandler): AdminShutdown, InterfaceError, DatabaseError, - ConnectionError, OperationalError, ) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py index 24a9ddffd0..bfb6c05a7a 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/apps.py @@ -32,16 +32,17 @@ class DjangoDramatiqPostgres(AppConfig): middleware=[], ) - for middleware_class, middleware_kwargs in Conf().middlewares: - middleware: dramatiq.middleware.middleware.Middleware = import_string(middleware_class)( - **middleware_kwargs, - ) - if isinstance(middleware, Results): - middleware.backend = import_string(Conf().result_backend)( + for middleware_class_path, middleware_kwargs in Conf().middlewares: + middleware_class = import_string(middleware_class_path) + if issubclass(middleware_class, Results): + middleware_kwargs["backend"] = import_string(Conf().result_backend)( *Conf().result_backend_args, **Conf().result_backend_kwargs, ) - broker.add_middleware(middleware) # type: ignore[no-untyped-call] + middleware: dramatiq.middleware.middleware.Middleware = middleware_class( + **middleware_kwargs, + ) + broker.add_middleware(middleware) dramatiq.set_broker(broker) diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 07bde403cb..3d0314df06 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -23,11 +23,9 @@ from django.utils.functional import cached_property from django.utils.module_loading import import_string from dramatiq.broker import Broker, Consumer, MessageProxy from dramatiq.common import compute_backoff, current_millis, dq_name, q_name, xq_name -from dramatiq.errors import ConnectionError, QueueJoinTimeout +from dramatiq.errors import BrokerConnectionError, QueueJoinTimeout from dramatiq.message import Message -from dramatiq.middleware import ( - Middleware, -) +from dramatiq.middleware import Middleware from pglock.core import _cast_lock_id from psycopg import sql from psycopg.errors import AdminShutdown @@ -46,7 +44,6 @@ DATABASE_ERRORS = ( AdminShutdown, InterfaceError, DatabaseError, - ConnectionError, OperationalError, ) @@ -55,7 +52,7 @@ def channel_name(queue_name: str, identifier: ChannelIdentifier) -> str: return f"{CHANNEL_PREFIX}.{queue_name}.{identifier.value}" -def raise_connection_error(func: Callable[P, R]) -> Callable[P, R]: # noqa: UP047 +def raise_broker_connection_error(func: Callable[P, R]) -> Callable[P, R]: # noqa: UP047 @functools.wraps(func) def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: try: @@ -66,13 +63,13 @@ def raise_connection_error(func: Callable[P, R]) -> Callable[P, R]: # noqa: UP0 connections.close_all() except DATABASE_ERRORS: pass - raise ConnectionError(str(exc)) from exc # type: ignore[no-untyped-call] + raise BrokerConnectionError(str(exc)) from exc # type: ignore[no-untyped-call] return wrapper class PostgresBroker(Broker): - queues: set[str] # type: ignore[assignment] + queues: set[str] def __init__( self, @@ -81,7 +78,7 @@ class PostgresBroker(Broker): db_alias: str = DEFAULT_DB_ALIAS, **kwargs: Any, ) -> None: - super().__init__(*args, middleware=[], **kwargs) # type: ignore[no-untyped-call,misc] + super().__init__(*args, middleware=[], **kwargs) # type: ignore[misc] self.logger = get_logger(__name__, type(self)) self.queues = set() @@ -122,10 +119,10 @@ class PostgresBroker(Broker): def declare_queue(self, queue_name: str) -> None: if queue_name not in self.queues: - self.emit_before("declare_queue", queue_name) # type: ignore[no-untyped-call] + self.emit_before("declare_queue", queue_name) self.queues.add(queue_name) # Nothing more to do, all queues are in the same table - self.emit_after("declare_queue", queue_name) # type: ignore[no-untyped-call] + self.emit_after("declare_queue", queue_name) def model_defaults(self, message: Message[Any]) -> dict[str, Any]: eta = None @@ -141,7 +138,7 @@ class PostgresBroker(Broker): } @tenacity.retry( - retry=tenacity.retry_if_exception_type(ConnectionError), + retry=tenacity.retry_if_exception_type(BrokerConnectionError), reraise=True, wait=tenacity.wait_random_exponential(multiplier=1, max=5), stop=tenacity.stop_after_attempt(3), @@ -149,11 +146,11 @@ class PostgresBroker(Broker): cast(logging.Logger, logger), logging.INFO, exc_info=True ), ) - @raise_connection_error + @raise_broker_connection_error def enqueue(self, message: Message[Any], *, delay: int | None = None) -> Message[Any]: - queue_name = q_name(message.queue_name) # type: ignore[no-untyped-call] + queue_name = q_name(message.queue_name) if delay: - message_eta = current_millis() + delay # type: ignore[no-untyped-call] + message_eta = current_millis() + delay message.options["eta"] = message_eta self.declare_queue(queue_name) @@ -163,7 +160,7 @@ class PostgresBroker(Broker): message.options["model_defaults"] = self.model_defaults(message) message.options["model_create_defaults"] = {} - self.emit_before("enqueue", message, delay) # type: ignore[no-untyped-call] + self.emit_before("enqueue", message, delay) with transaction.atomic(using=self.db_alias): query = { @@ -185,7 +182,7 @@ class PostgresBroker(Broker): message.options["task"] = task message.options["task_created"] = created - self.emit_after("enqueue", message, delay) # type: ignore[no-untyped-call] + self.emit_after("enqueue", message, delay) return message def get_declared_queues(self) -> set[str]: @@ -193,7 +190,7 @@ class PostgresBroker(Broker): def flush(self, queue_name: str) -> None: self.query_set.filter( - queue_name__in=(queue_name, dq_name(queue_name), xq_name(queue_name)) # type: ignore[no-untyped-call] + queue_name__in=(queue_name, dq_name(queue_name), xq_name(queue_name)) ).delete() def flush_all(self) -> None: @@ -375,7 +372,7 @@ class _PostgresConsumer(Consumer): self.in_processing.add(str(message_id)) return message - @raise_connection_error + @raise_broker_connection_error def __next__(self) -> MessageProxy | None: # This method is called every second @@ -395,7 +392,7 @@ class _PostgresConsumer(Consumer): if processing >= self.prefetch: # If we have too many messages already processing, wait and don't consume a message # straight away, other workers will be faster. - self.misses, backoff_ms = compute_backoff(self.misses, max_backoff=1000) # type: ignore[no-untyped-call] + self.misses, backoff_ms = compute_backoff(self.misses, max_backoff=1000) self.logger.debug( "Too many messages in processing, Sleeping", processing=processing, @@ -420,7 +417,7 @@ class _PostgresConsumer(Consumer): break message = self._consume_one(str(message_id)) if message is not None: - return MessageProxy(message) # type: ignore[no-untyped-call] + return MessageProxy(message) else: self.logger.debug("Message already consumed. Skipping.", message_id=message_id) continue @@ -444,7 +441,7 @@ class _PostgresConsumer(Consumer): self.to_unlock.add(str(message_id)) return False - def _post_process_message(self, message: Message[Any], state: TaskState) -> None: + def _post_process_message(self, message: MessageProxy, state: TaskState) -> None: self.logger.debug("Post-processing message", message=message.message_id, state=state) try: self.in_processing.remove(str(message.message_id)) @@ -466,16 +463,16 @@ class _PostgresConsumer(Consumer): ) message.options["task"] = task - @raise_connection_error - def ack(self, message: Message[Any]) -> None: + @raise_broker_connection_error + def ack(self, message: MessageProxy) -> None: self._post_process_message(message, TaskState.DONE) - @raise_connection_error - def nack(self, message: Message[Any]) -> None: + @raise_broker_connection_error + def nack(self, message: MessageProxy) -> None: self._post_process_message(message, TaskState.REJECTED) - @raise_connection_error - def requeue(self, messages: Iterable[Message[Any]]) -> None: + @raise_broker_connection_error + def requeue(self, messages: Iterable[MessageProxy]) -> None: self.query_set.filter( message_id__in=[message.message_id for message in messages], ).update( @@ -514,7 +511,7 @@ class _PostgresConsumer(Consumer): self.logger.info("Purged messages in all queues", count=count) self.task_purge_last_run = timezone.now() - @raise_connection_error + @raise_broker_connection_error def close(self) -> None: try: self._purge_locks() diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/forks.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/forks.py index b2195c104c..9bdc167da4 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/forks.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/forks.py @@ -5,7 +5,7 @@ from signal import pause from django_dramatiq_postgres.conf import Conf -def worker_metrics() -> None: +def worker_metrics() -> int: import_module(Conf().autodiscovery["setup_module"]) from django_dramatiq_postgres.middleware import MetricsMiddleware @@ -15,3 +15,4 @@ def worker_metrics() -> None: int(os.getenv("dramatiq_prom_port", "9191")), ) pause() + return 0 diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py index a128921b1e..372d65957a 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/middleware.py @@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, cast from django.db import DatabaseError, close_old_connections, connections from dramatiq.actor import Actor -from dramatiq.broker import Broker +from dramatiq.broker import Broker, MessageProxy from dramatiq.common import current_millis from dramatiq.message import Message from dramatiq.middleware.middleware import Middleware @@ -79,7 +79,7 @@ class DbConnectionMiddleware(Middleware): class TaskStateBeforeMiddleware(Middleware): - def before_process_message(self, broker: PostgresBroker, message: Message[Any]) -> None: + def before_process_message(self, broker: PostgresBroker, message: Message[Any]) -> None: # type: ignore[override] broker.query_set.filter( message_id=message.message_id, queue_name=message.queue_name, @@ -90,7 +90,7 @@ class TaskStateBeforeMiddleware(Middleware): class TaskStateAfterMiddleware(Middleware): - def before_process_message(self, broker: PostgresBroker, message: Message[Any]) -> None: + def before_process_message(self, broker: PostgresBroker, message: MessageProxy) -> None: # type: ignore[override] broker.query_set.filter( message_id=message.message_id, queue_name=message.queue_name, @@ -99,7 +99,7 @@ class TaskStateAfterMiddleware(Middleware): state=TaskState.RUNNING, ) - def after_skip_message(self, broker: PostgresBroker, message: Message[Any]) -> None: + def after_skip_message(self, broker: PostgresBroker, message: MessageProxy) -> None: # type: ignore[override] broker.query_set.filter( message_id=message.message_id, queue_name=message.queue_name, @@ -110,11 +110,11 @@ class TaskStateAfterMiddleware(Middleware): def after_process_message( self, - broker: PostgresBroker, - message: Message[Any], + broker: PostgresBroker, # type: ignore[override] + message: MessageProxy, *, result: Any | None = None, - exception: Exception | None = None, + exception: BaseException | None = None, ) -> None: self.after_skip_message(broker, message) @@ -147,7 +147,7 @@ class CurrentTask(Middleware): raise CurrentTaskNotFound() return task[-1] - def before_process_message(self, broker: Broker, message: Message[Any]) -> None: + def before_process_message(self, broker: Broker, message: MessageProxy) -> None: tasks = self._TASKS.get() if tasks is None: tasks = [] @@ -157,10 +157,10 @@ class CurrentTask(Middleware): def after_process_message( self, broker: Broker, - message: Message[Any], + message: MessageProxy, *, result: Any | None = None, - exception: Exception | None = None, + exception: BaseException | None = None, ) -> None: tasks: list[TaskBase] | None = self._TASKS.get() if tasks is None or len(tasks) == 0: @@ -194,7 +194,7 @@ class CurrentTask(Middleware): pass self._TASKS.set(tasks[:-1]) - def after_skip_message(self, broker: Broker, message: Message[Any]) -> None: + def after_skip_message(self, broker: Broker, message: MessageProxy) -> None: self.after_process_message(broker, message) @@ -236,7 +236,7 @@ class MetricsMiddleware(Middleware): self.message_start_times: dict[str, int] = {} @property - def forks(self) -> list[Callable[[], None]]: + def forks(self) -> list[Callable[[], int]]: from django_dramatiq_postgres.forks import worker_metrics return [worker_metrics] @@ -310,41 +310,41 @@ class MetricsMiddleware(Middleware): # TODO: worker_id multiprocess.mark_process_dead(os.getpid()) # type: ignore[no-untyped-call] - def _make_labels(self, message: Message[Any]) -> list[str]: + def _make_labels(self, message: MessageProxy | Message[Any]) -> list[str]: return [message.queue_name, message.actor_name] - def after_nack(self, broker: Broker, message: Message[Any]) -> None: + def after_nack(self, broker: Broker, message: MessageProxy) -> None: self.total_rejected_messages.labels(*self._make_labels(message)).inc() def after_enqueue(self, broker: Broker, message: Message[Any], delay: int) -> None: if "retries" in message.options: self.total_retried_messages.labels(*self._make_labels(message)).inc() - def before_delay_message(self, broker: Broker, message: Message[Any]) -> None: + def before_delay_message(self, broker: Broker, message: MessageProxy) -> None: self.delayed_messages.add(message.message_id) self.in_progress_delayed_messages.labels(*self._make_labels(message)).inc() - def before_process_message(self, broker: Broker, message: Message[Any]) -> None: + def before_process_message(self, broker: Broker, message: MessageProxy) -> None: labels = self._make_labels(message) if message.message_id in self.delayed_messages: self.delayed_messages.remove(message.message_id) self.in_progress_delayed_messages.labels(*labels).dec() self.in_progress_messages.labels(*labels).inc() - self.message_start_times[message.message_id] = current_millis() # type: ignore[no-untyped-call] + self.message_start_times[message.message_id] = current_millis() def after_process_message( self, broker: Broker, - message: Message[Any], + message: MessageProxy, *, result: Any | None = None, - exception: Exception | None = None, + exception: BaseException | None = None, ) -> None: labels = self._make_labels(message) - message_start_time = self.message_start_times.pop(message.message_id, current_millis()) # type: ignore[no-untyped-call] - message_duration = current_millis() - message_start_time # type: ignore[no-untyped-call] + message_start_time = self.message_start_times.pop(message.message_id, current_millis()) + message_duration = current_millis() - message_start_time self.messages_durations.labels(*labels).observe(message_duration) self.in_progress_messages.labels(*labels).dec() diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py index c54f79549f..df8301219c 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/models.py @@ -159,7 +159,7 @@ class ScheduleBase(models.Model): def send(self, broker: Broker | None = None) -> Message[Any]: broker = broker or get_broker() - actor: Actor[Any, Any] = broker.get_actor(self.actor_name) # type: ignore[no-untyped-call] + actor: Actor[Any, Any] = broker.get_actor(self.actor_name) return actor.send_with_options( args=pickle.loads(self.args), # nosec kwargs=pickle.loads(self.kwargs), # nosec diff --git a/packages/django-dramatiq-postgres/pyproject.toml b/packages/django-dramatiq-postgres/pyproject.toml index f9e58a3b77..de34d9891b 100644 --- a/packages/django-dramatiq-postgres/pyproject.toml +++ b/packages/django-dramatiq-postgres/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ "django >=4.2,<6.0", "django-pglock >=1.7,<2", "django-pgtrigger >=4,<5", - "dramatiq >=1.17,<1.18", + "dramatiq >=2,<3", "tenacity >=9,<10", "structlog >=25,<26", ] diff --git a/uv.lock b/uv.lock index df40686078..a6e97e3c79 100644 --- a/uv.lock +++ b/uv.lock @@ -1143,7 +1143,7 @@ requires-dist = [ { name = "django", specifier = ">=4.2,<6.0" }, { name = "django-pglock", specifier = ">=1.7,<2" }, { name = "django-pgtrigger", specifier = ">=4,<5" }, - { name = "dramatiq", specifier = ">=1.17,<1.18" }, + { name = "dramatiq", specifier = ">=2,<3" }, { name = "structlog", specifier = ">=25,<26" }, { name = "tenacity", specifier = ">=9,<10" }, ] @@ -1381,14 +1381,11 @@ wheels = [ [[package]] name = "dramatiq" -version = "1.17.1" +version = "2.1.0" source = { registry = "https://pypi.org/simple" } -dependencies = [ - { name = "prometheus-client" }, -] -sdist = { url = "https://files.pythonhosted.org/packages/c6/7a/6792ddc64a77d22bfd97261b751a7a76cf2f9d62edc59aafb679ac48b77d/dramatiq-1.17.1.tar.gz", hash = "sha256:2675d2f57e0d82db3a7d2a60f1f9c536365349db78c7f8d80a63e4c54697647a", size = 99071, upload-time = "2024-10-26T05:09:28.283Z" } +sdist = { url = "https://files.pythonhosted.org/packages/22/69/02b54e3fc4fe75721b322bc578054b4f03cec258ba614fa98a1a5bbe1efe/dramatiq-2.1.0.tar.gz", hash = "sha256:cf81550729de6cf64234b05bd63970645654aaf38967faa7a2b6e401384bb090", size = 105444, upload-time = "2026-03-03T11:22:10.067Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/ee/36/925c7afd5db4f1a3f00676b9c3c58f31ff7ae29a347282d86c8d429280a5/dramatiq-1.17.1-py3-none-any.whl", hash = "sha256:951cdc334478dff8e5150bb02a6f7a947d215ee24b5aedaf738eff20e17913df", size = 120382, upload-time = "2024-10-26T05:09:26.436Z" }, + { url = "https://files.pythonhosted.org/packages/c2/91/422960c8c415fd31ca1519d71d6f7e4bcabb2cdcc5872f784467e9fe7237/dramatiq-2.1.0-py3-none-any.whl", hash = "sha256:3ef940c2815722d3679aed79ef96c805f02fd33d4361529b2de30f01511ca44d", size = 125543, upload-time = "2026-03-03T11:22:08.664Z" }, ] [[package]]