core: bump dramatiq from 1.17.1 to 2.1.0 (#22076)

Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
dependabot[bot]
2026-05-06 14:42:29 +00:00
committed by GitHub
parent 1db6c3af8b
commit b32df17513
9 changed files with 75 additions and 84 deletions
+11 -10
View File
@@ -7,7 +7,7 @@ from dramatiq.broker import Broker, MessageProxy, get_broker
from dramatiq.middleware.middleware import Middleware
from dramatiq.middleware.retries import Retries
from dramatiq.results.middleware import Results
from dramatiq.worker import Worker, _ConsumerThread, _WorkerThread
from dramatiq.worker import ConsumerThread, Worker, WorkerThread
from authentik.tasks.broker import PostgresBroker
@@ -20,7 +20,7 @@ class TestWorker(Worker):
self.worker_id = 1000
self.work_queue = PriorityQueue()
self.consumers = {
TESTING_QUEUE: _ConsumerThread(
TESTING_QUEUE: ConsumerThread(
broker=self.broker,
queue_name=TESTING_QUEUE,
prefetch=2,
@@ -33,7 +33,7 @@ class TestWorker(Worker):
prefetch=2,
timeout=1,
)
self._worker = _WorkerThread(
self._worker = WorkerThread(
broker=self.broker,
consumers=self.consumers,
work_queue=self.work_queue,
@@ -78,17 +78,18 @@ def use_test_broker():
actor.broker = broker
actor.broker.declare_actor(actor)
for middleware_class, middleware_kwargs in Conf().middlewares:
middleware: Middleware = import_string(middleware_class)(
for middleware_class_path, middleware_kwargs in Conf().middlewares:
middleware_class = import_string(middleware_class_path)
if issubclass(middleware_class, Results):
middleware_kwargs["backend"] = import_string(Conf().result_backend)(
*Conf().result_backend_args,
**Conf().result_backend_kwargs,
)
middleware: Middleware = middleware_class(
**middleware_kwargs,
)
if isinstance(middleware, Retries):
middleware.max_retries = 0
if isinstance(middleware, Results):
middleware.backend = import_string(Conf().result_backend)(
*Conf().result_backend_args,
**Conf().result_backend_kwargs,
)
broker.add_middleware(middleware)
broker.start()
+1 -7
View File
@@ -28,12 +28,7 @@ class HttpHandler(BaseHTTPRequestHandler):
_ = db_conn.cursor()
def do_GET(self):
from django.db import (
DatabaseError,
InterfaceError,
OperationalError,
connections,
)
from django.db import DatabaseError, InterfaceError, OperationalError, connections
from psycopg.errors import AdminShutdown
from authentik.root.monitoring import monitoring_set
@@ -42,7 +37,6 @@ class HttpHandler(BaseHTTPRequestHandler):
AdminShutdown,
InterfaceError,
DatabaseError,
ConnectionError,
OperationalError,
)
@@ -32,16 +32,17 @@ class DjangoDramatiqPostgres(AppConfig):
middleware=[],
)
for middleware_class, middleware_kwargs in Conf().middlewares:
middleware: dramatiq.middleware.middleware.Middleware = import_string(middleware_class)(
**middleware_kwargs,
)
if isinstance(middleware, Results):
middleware.backend = import_string(Conf().result_backend)(
for middleware_class_path, middleware_kwargs in Conf().middlewares:
middleware_class = import_string(middleware_class_path)
if issubclass(middleware_class, Results):
middleware_kwargs["backend"] = import_string(Conf().result_backend)(
*Conf().result_backend_args,
**Conf().result_backend_kwargs,
)
broker.add_middleware(middleware) # type: ignore[no-untyped-call]
middleware: dramatiq.middleware.middleware.Middleware = middleware_class(
**middleware_kwargs,
)
broker.add_middleware(middleware)
dramatiq.set_broker(broker)
@@ -23,11 +23,9 @@ from django.utils.functional import cached_property
from django.utils.module_loading import import_string
from dramatiq.broker import Broker, Consumer, MessageProxy
from dramatiq.common import compute_backoff, current_millis, dq_name, q_name, xq_name
from dramatiq.errors import ConnectionError, QueueJoinTimeout
from dramatiq.errors import BrokerConnectionError, QueueJoinTimeout
from dramatiq.message import Message
from dramatiq.middleware import (
Middleware,
)
from dramatiq.middleware import Middleware
from pglock.core import _cast_lock_id
from psycopg import sql
from psycopg.errors import AdminShutdown
@@ -46,7 +44,6 @@ DATABASE_ERRORS = (
AdminShutdown,
InterfaceError,
DatabaseError,
ConnectionError,
OperationalError,
)
@@ -55,7 +52,7 @@ def channel_name(queue_name: str, identifier: ChannelIdentifier) -> str:
return f"{CHANNEL_PREFIX}.{queue_name}.{identifier.value}"
def raise_connection_error(func: Callable[P, R]) -> Callable[P, R]: # noqa: UP047
def raise_broker_connection_error(func: Callable[P, R]) -> Callable[P, R]: # noqa: UP047
@functools.wraps(func)
def wrapper(*args: P.args, **kwargs: P.kwargs) -> R:
try:
@@ -66,13 +63,13 @@ def raise_connection_error(func: Callable[P, R]) -> Callable[P, R]: # noqa: UP0
connections.close_all()
except DATABASE_ERRORS:
pass
raise ConnectionError(str(exc)) from exc # type: ignore[no-untyped-call]
raise BrokerConnectionError(str(exc)) from exc # type: ignore[no-untyped-call]
return wrapper
class PostgresBroker(Broker):
queues: set[str] # type: ignore[assignment]
queues: set[str]
def __init__(
self,
@@ -81,7 +78,7 @@ class PostgresBroker(Broker):
db_alias: str = DEFAULT_DB_ALIAS,
**kwargs: Any,
) -> None:
super().__init__(*args, middleware=[], **kwargs) # type: ignore[no-untyped-call,misc]
super().__init__(*args, middleware=[], **kwargs) # type: ignore[misc]
self.logger = get_logger(__name__, type(self))
self.queues = set()
@@ -122,10 +119,10 @@ class PostgresBroker(Broker):
def declare_queue(self, queue_name: str) -> None:
if queue_name not in self.queues:
self.emit_before("declare_queue", queue_name) # type: ignore[no-untyped-call]
self.emit_before("declare_queue", queue_name)
self.queues.add(queue_name)
# Nothing more to do, all queues are in the same table
self.emit_after("declare_queue", queue_name) # type: ignore[no-untyped-call]
self.emit_after("declare_queue", queue_name)
def model_defaults(self, message: Message[Any]) -> dict[str, Any]:
eta = None
@@ -141,7 +138,7 @@ class PostgresBroker(Broker):
}
@tenacity.retry(
retry=tenacity.retry_if_exception_type(ConnectionError),
retry=tenacity.retry_if_exception_type(BrokerConnectionError),
reraise=True,
wait=tenacity.wait_random_exponential(multiplier=1, max=5),
stop=tenacity.stop_after_attempt(3),
@@ -149,11 +146,11 @@ class PostgresBroker(Broker):
cast(logging.Logger, logger), logging.INFO, exc_info=True
),
)
@raise_connection_error
@raise_broker_connection_error
def enqueue(self, message: Message[Any], *, delay: int | None = None) -> Message[Any]:
queue_name = q_name(message.queue_name) # type: ignore[no-untyped-call]
queue_name = q_name(message.queue_name)
if delay:
message_eta = current_millis() + delay # type: ignore[no-untyped-call]
message_eta = current_millis() + delay
message.options["eta"] = message_eta
self.declare_queue(queue_name)
@@ -163,7 +160,7 @@ class PostgresBroker(Broker):
message.options["model_defaults"] = self.model_defaults(message)
message.options["model_create_defaults"] = {}
self.emit_before("enqueue", message, delay) # type: ignore[no-untyped-call]
self.emit_before("enqueue", message, delay)
with transaction.atomic(using=self.db_alias):
query = {
@@ -185,7 +182,7 @@ class PostgresBroker(Broker):
message.options["task"] = task
message.options["task_created"] = created
self.emit_after("enqueue", message, delay) # type: ignore[no-untyped-call]
self.emit_after("enqueue", message, delay)
return message
def get_declared_queues(self) -> set[str]:
@@ -193,7 +190,7 @@ class PostgresBroker(Broker):
def flush(self, queue_name: str) -> None:
self.query_set.filter(
queue_name__in=(queue_name, dq_name(queue_name), xq_name(queue_name)) # type: ignore[no-untyped-call]
queue_name__in=(queue_name, dq_name(queue_name), xq_name(queue_name))
).delete()
def flush_all(self) -> None:
@@ -375,7 +372,7 @@ class _PostgresConsumer(Consumer):
self.in_processing.add(str(message_id))
return message
@raise_connection_error
@raise_broker_connection_error
def __next__(self) -> MessageProxy | None:
# This method is called every second
@@ -395,7 +392,7 @@ class _PostgresConsumer(Consumer):
if processing >= self.prefetch:
# If we have too many messages already processing, wait and don't consume a message
# straight away, other workers will be faster.
self.misses, backoff_ms = compute_backoff(self.misses, max_backoff=1000) # type: ignore[no-untyped-call]
self.misses, backoff_ms = compute_backoff(self.misses, max_backoff=1000)
self.logger.debug(
"Too many messages in processing, Sleeping",
processing=processing,
@@ -420,7 +417,7 @@ class _PostgresConsumer(Consumer):
break
message = self._consume_one(str(message_id))
if message is not None:
return MessageProxy(message) # type: ignore[no-untyped-call]
return MessageProxy(message)
else:
self.logger.debug("Message already consumed. Skipping.", message_id=message_id)
continue
@@ -444,7 +441,7 @@ class _PostgresConsumer(Consumer):
self.to_unlock.add(str(message_id))
return False
def _post_process_message(self, message: Message[Any], state: TaskState) -> None:
def _post_process_message(self, message: MessageProxy, state: TaskState) -> None:
self.logger.debug("Post-processing message", message=message.message_id, state=state)
try:
self.in_processing.remove(str(message.message_id))
@@ -466,16 +463,16 @@ class _PostgresConsumer(Consumer):
)
message.options["task"] = task
@raise_connection_error
def ack(self, message: Message[Any]) -> None:
@raise_broker_connection_error
def ack(self, message: MessageProxy) -> None:
self._post_process_message(message, TaskState.DONE)
@raise_connection_error
def nack(self, message: Message[Any]) -> None:
@raise_broker_connection_error
def nack(self, message: MessageProxy) -> None:
self._post_process_message(message, TaskState.REJECTED)
@raise_connection_error
def requeue(self, messages: Iterable[Message[Any]]) -> None:
@raise_broker_connection_error
def requeue(self, messages: Iterable[MessageProxy]) -> None:
self.query_set.filter(
message_id__in=[message.message_id for message in messages],
).update(
@@ -514,7 +511,7 @@ class _PostgresConsumer(Consumer):
self.logger.info("Purged messages in all queues", count=count)
self.task_purge_last_run = timezone.now()
@raise_connection_error
@raise_broker_connection_error
def close(self) -> None:
try:
self._purge_locks()
@@ -5,7 +5,7 @@ from signal import pause
from django_dramatiq_postgres.conf import Conf
def worker_metrics() -> None:
def worker_metrics() -> int:
import_module(Conf().autodiscovery["setup_module"])
from django_dramatiq_postgres.middleware import MetricsMiddleware
@@ -15,3 +15,4 @@ def worker_metrics() -> None:
int(os.getenv("dramatiq_prom_port", "9191")),
)
pause()
return 0
@@ -10,7 +10,7 @@ from typing import TYPE_CHECKING, Any, cast
from django.db import DatabaseError, close_old_connections, connections
from dramatiq.actor import Actor
from dramatiq.broker import Broker
from dramatiq.broker import Broker, MessageProxy
from dramatiq.common import current_millis
from dramatiq.message import Message
from dramatiq.middleware.middleware import Middleware
@@ -79,7 +79,7 @@ class DbConnectionMiddleware(Middleware):
class TaskStateBeforeMiddleware(Middleware):
def before_process_message(self, broker: PostgresBroker, message: Message[Any]) -> None:
def before_process_message(self, broker: PostgresBroker, message: Message[Any]) -> None: # type: ignore[override]
broker.query_set.filter(
message_id=message.message_id,
queue_name=message.queue_name,
@@ -90,7 +90,7 @@ class TaskStateBeforeMiddleware(Middleware):
class TaskStateAfterMiddleware(Middleware):
def before_process_message(self, broker: PostgresBroker, message: Message[Any]) -> None:
def before_process_message(self, broker: PostgresBroker, message: MessageProxy) -> None: # type: ignore[override]
broker.query_set.filter(
message_id=message.message_id,
queue_name=message.queue_name,
@@ -99,7 +99,7 @@ class TaskStateAfterMiddleware(Middleware):
state=TaskState.RUNNING,
)
def after_skip_message(self, broker: PostgresBroker, message: Message[Any]) -> None:
def after_skip_message(self, broker: PostgresBroker, message: MessageProxy) -> None: # type: ignore[override]
broker.query_set.filter(
message_id=message.message_id,
queue_name=message.queue_name,
@@ -110,11 +110,11 @@ class TaskStateAfterMiddleware(Middleware):
def after_process_message(
self,
broker: PostgresBroker,
message: Message[Any],
broker: PostgresBroker, # type: ignore[override]
message: MessageProxy,
*,
result: Any | None = None,
exception: Exception | None = None,
exception: BaseException | None = None,
) -> None:
self.after_skip_message(broker, message)
@@ -147,7 +147,7 @@ class CurrentTask(Middleware):
raise CurrentTaskNotFound()
return task[-1]
def before_process_message(self, broker: Broker, message: Message[Any]) -> None:
def before_process_message(self, broker: Broker, message: MessageProxy) -> None:
tasks = self._TASKS.get()
if tasks is None:
tasks = []
@@ -157,10 +157,10 @@ class CurrentTask(Middleware):
def after_process_message(
self,
broker: Broker,
message: Message[Any],
message: MessageProxy,
*,
result: Any | None = None,
exception: Exception | None = None,
exception: BaseException | None = None,
) -> None:
tasks: list[TaskBase] | None = self._TASKS.get()
if tasks is None or len(tasks) == 0:
@@ -194,7 +194,7 @@ class CurrentTask(Middleware):
pass
self._TASKS.set(tasks[:-1])
def after_skip_message(self, broker: Broker, message: Message[Any]) -> None:
def after_skip_message(self, broker: Broker, message: MessageProxy) -> None:
self.after_process_message(broker, message)
@@ -236,7 +236,7 @@ class MetricsMiddleware(Middleware):
self.message_start_times: dict[str, int] = {}
@property
def forks(self) -> list[Callable[[], None]]:
def forks(self) -> list[Callable[[], int]]:
from django_dramatiq_postgres.forks import worker_metrics
return [worker_metrics]
@@ -310,41 +310,41 @@ class MetricsMiddleware(Middleware):
# TODO: worker_id
multiprocess.mark_process_dead(os.getpid()) # type: ignore[no-untyped-call]
def _make_labels(self, message: Message[Any]) -> list[str]:
def _make_labels(self, message: MessageProxy | Message[Any]) -> list[str]:
return [message.queue_name, message.actor_name]
def after_nack(self, broker: Broker, message: Message[Any]) -> None:
def after_nack(self, broker: Broker, message: MessageProxy) -> None:
self.total_rejected_messages.labels(*self._make_labels(message)).inc()
def after_enqueue(self, broker: Broker, message: Message[Any], delay: int) -> None:
if "retries" in message.options:
self.total_retried_messages.labels(*self._make_labels(message)).inc()
def before_delay_message(self, broker: Broker, message: Message[Any]) -> None:
def before_delay_message(self, broker: Broker, message: MessageProxy) -> None:
self.delayed_messages.add(message.message_id)
self.in_progress_delayed_messages.labels(*self._make_labels(message)).inc()
def before_process_message(self, broker: Broker, message: Message[Any]) -> None:
def before_process_message(self, broker: Broker, message: MessageProxy) -> None:
labels = self._make_labels(message)
if message.message_id in self.delayed_messages:
self.delayed_messages.remove(message.message_id)
self.in_progress_delayed_messages.labels(*labels).dec()
self.in_progress_messages.labels(*labels).inc()
self.message_start_times[message.message_id] = current_millis() # type: ignore[no-untyped-call]
self.message_start_times[message.message_id] = current_millis()
def after_process_message(
self,
broker: Broker,
message: Message[Any],
message: MessageProxy,
*,
result: Any | None = None,
exception: Exception | None = None,
exception: BaseException | None = None,
) -> None:
labels = self._make_labels(message)
message_start_time = self.message_start_times.pop(message.message_id, current_millis()) # type: ignore[no-untyped-call]
message_duration = current_millis() - message_start_time # type: ignore[no-untyped-call]
message_start_time = self.message_start_times.pop(message.message_id, current_millis())
message_duration = current_millis() - message_start_time
self.messages_durations.labels(*labels).observe(message_duration)
self.in_progress_messages.labels(*labels).dec()
@@ -159,7 +159,7 @@ class ScheduleBase(models.Model):
def send(self, broker: Broker | None = None) -> Message[Any]:
broker = broker or get_broker()
actor: Actor[Any, Any] = broker.get_actor(self.actor_name) # type: ignore[no-untyped-call]
actor: Actor[Any, Any] = broker.get_actor(self.actor_name)
return actor.send_with_options(
args=pickle.loads(self.args), # nosec
kwargs=pickle.loads(self.kwargs), # nosec
@@ -36,7 +36,7 @@ dependencies = [
"django >=4.2,<6.0",
"django-pglock >=1.7,<2",
"django-pgtrigger >=4,<5",
"dramatiq >=1.17,<1.18",
"dramatiq >=2,<3",
"tenacity >=9,<10",
"structlog >=25,<26",
]
Generated
+4 -7
View File
@@ -1143,7 +1143,7 @@ requires-dist = [
{ name = "django", specifier = ">=4.2,<6.0" },
{ name = "django-pglock", specifier = ">=1.7,<2" },
{ name = "django-pgtrigger", specifier = ">=4,<5" },
{ name = "dramatiq", specifier = ">=1.17,<1.18" },
{ name = "dramatiq", specifier = ">=2,<3" },
{ name = "structlog", specifier = ">=25,<26" },
{ name = "tenacity", specifier = ">=9,<10" },
]
@@ -1381,14 +1381,11 @@ wheels = [
[[package]]
name = "dramatiq"
version = "1.17.1"
version = "2.1.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "prometheus-client" },
]
sdist = { url = "https://files.pythonhosted.org/packages/c6/7a/6792ddc64a77d22bfd97261b751a7a76cf2f9d62edc59aafb679ac48b77d/dramatiq-1.17.1.tar.gz", hash = "sha256:2675d2f57e0d82db3a7d2a60f1f9c536365349db78c7f8d80a63e4c54697647a", size = 99071, upload-time = "2024-10-26T05:09:28.283Z" }
sdist = { url = "https://files.pythonhosted.org/packages/22/69/02b54e3fc4fe75721b322bc578054b4f03cec258ba614fa98a1a5bbe1efe/dramatiq-2.1.0.tar.gz", hash = "sha256:cf81550729de6cf64234b05bd63970645654aaf38967faa7a2b6e401384bb090", size = 105444, upload-time = "2026-03-03T11:22:10.067Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/36/925c7afd5db4f1a3f00676b9c3c58f31ff7ae29a347282d86c8d429280a5/dramatiq-1.17.1-py3-none-any.whl", hash = "sha256:951cdc334478dff8e5150bb02a6f7a947d215ee24b5aedaf738eff20e17913df", size = 120382, upload-time = "2024-10-26T05:09:26.436Z" },
{ url = "https://files.pythonhosted.org/packages/c2/91/422960c8c415fd31ca1519d71d6f7e4bcabb2cdcc5872f784467e9fe7237/dramatiq-2.1.0-py3-none-any.whl", hash = "sha256:3ef940c2815722d3679aed79ef96c805f02fd33d4361529b2de30f01511ca44d", size = 125543, upload-time = "2026-03-03T11:22:08.664Z" },
]
[[package]]