packages/django-dramatiq-postgres: scheduler: only dispatch tasks if they're not running yet (#20921)

* packages/django-dramatiq-postgres: scheduler: only dispatch tasks if they're not running yet

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>

* lint

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>

---------

Signed-off-by: Marc 'risson' Schmitt <marc.schmitt@risson.space>
This commit is contained in:
Marc 'risson' Schmitt
2026-03-17 11:14:16 +00:00
committed by GitHub
parent c2445d6f9b
commit 57b2984f74
2 changed files with 14 additions and 9 deletions
@@ -259,6 +259,7 @@ class _PostgresConsumer(Consumer):
if Conf().schedule_model:
self.scheduler = import_string(Conf().scheduler_class)()
self.scheduler.broker = self.broker
self.scheduler.db_alias = self.db_alias
self.scheduler_interval = timedelta(seconds=Conf().scheduler_interval)
self.scheduler_last_run = timezone.now() - self.scheduler_interval
@@ -1,7 +1,6 @@
from typing import Any, cast
import pglock
from django.db import router, transaction
from django.db.models import QuerySet
from django.utils.functional import cached_property
from django.utils.module_loading import import_string
@@ -10,11 +9,12 @@ from dramatiq.broker import Broker
from structlog.stdlib import get_logger
from django_dramatiq_postgres.conf import Conf
from django_dramatiq_postgres.models import ScheduleBase
from django_dramatiq_postgres.models import ScheduleBase, TaskState
class Scheduler:
broker: Broker
db_alias: str
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
@@ -28,12 +28,12 @@ class Scheduler:
@property
def query_set(self) -> QuerySet[ScheduleBase]:
return self.model._default_manager.filter(paused=False)
return self.model._default_manager.using(self.db_alias).filter(paused=False)
def process_schedule(self, schedule: ScheduleBase) -> None:
schedule.next_run = schedule.compute_next_run()
schedule.send(self.broker)
schedule.save()
schedule.save(update_fields=["next_run"])
def _lock(self) -> pglock.advisory:
return pglock.advisory(
@@ -44,12 +44,16 @@ class Scheduler:
def _run(self) -> int:
count = 0
with transaction.atomic(using=router.db_for_write(self.model)):
for schedule in self.query_set.select_for_update().filter(
next_run__lt=now(),
for schedule in self.query_set.filter(next_run__lt=now()):
if (
schedule.tasks.using(self.db_alias) # type: ignore[attr-defined]
.exclude(state__in=(TaskState.DONE, TaskState.REJECTED))
.exists()
):
self.process_schedule(schedule)
count += 1
self.logger.debug("Skipping schedule, tasks already exists", schedule=schedule)
continue
self.process_schedule(schedule)
count += 1
return count
def run(self) -> int: