diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 5554c5036f..459a266a27 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -259,6 +259,7 @@ class _PostgresConsumer(Consumer): if Conf().schedule_model: self.scheduler = import_string(Conf().scheduler_class)() self.scheduler.broker = self.broker + self.scheduler.db_alias = self.db_alias self.scheduler_interval = timedelta(seconds=Conf().scheduler_interval) self.scheduler_last_run = timezone.now() - self.scheduler_interval diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py index baad352c04..d36fb335ed 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/scheduler.py @@ -1,7 +1,6 @@ from typing import Any, cast import pglock -from django.db import router, transaction from django.db.models import QuerySet from django.utils.functional import cached_property from django.utils.module_loading import import_string @@ -10,11 +9,12 @@ from dramatiq.broker import Broker from structlog.stdlib import get_logger from django_dramatiq_postgres.conf import Conf -from django_dramatiq_postgres.models import ScheduleBase +from django_dramatiq_postgres.models import ScheduleBase, TaskState class Scheduler: broker: Broker + db_alias: str def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) @@ -28,12 +28,12 @@ class Scheduler: @property def query_set(self) -> QuerySet[ScheduleBase]: - return self.model._default_manager.filter(paused=False) + return self.model._default_manager.using(self.db_alias).filter(paused=False) def process_schedule(self, schedule: ScheduleBase) -> None: schedule.next_run = schedule.compute_next_run() schedule.send(self.broker) - schedule.save() + schedule.save(update_fields=["next_run"]) def _lock(self) -> pglock.advisory: return pglock.advisory( @@ -44,12 +44,16 @@ class Scheduler: def _run(self) -> int: count = 0 - with transaction.atomic(using=router.db_for_write(self.model)): - for schedule in self.query_set.select_for_update().filter( - next_run__lt=now(), + for schedule in self.query_set.filter(next_run__lt=now()): + if ( + schedule.tasks.using(self.db_alias) # type: ignore[attr-defined] + .exclude(state__in=(TaskState.DONE, TaskState.REJECTED)) + .exists() ): - self.process_schedule(schedule) - count += 1 + self.logger.debug("Skipping schedule, tasks already exists", schedule=schedule) + continue + self.process_schedule(schedule) + count += 1 return count def run(self) -> int: