diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 752c71c77d..7aa868f77f 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -364,16 +364,15 @@ class _PostgresConsumer(Consumer): @raise_connection_error def __next__(self) -> MessageProxy | None: # This method is called every second - notifies = [] # If we don't have a connection yet, fetch missed notifications from the table directly - if self._listen_connection is None: + if self._listen_connection is None and not self.notifies: # We might miss a notification between the initial query and the first time we wait for # notifications, it doesn't matter because we re-fetch for missed messages later on. - notifies = self._fetch_pending_notifies() + self.notifies = self._fetch_pending_notifies() self.logger.debug( "Found pending messages in queue", - notifies=len(notifies), + notifies=len(self.notifies), queue=self.queue_name, ) # Force creation of listen connection @@ -391,15 +390,15 @@ class _PostgresConsumer(Consumer): time.sleep(backoff_ms / 1000) return None - if not notifies: - notifies = self._poll_for_notify() + if not self.notifies: + self.notifies += self._poll_for_notify() - if not notifies: - notifies = self._fetch_pending_notifies() + if not self.notifies: + self.notifies += self._fetch_pending_notifies() # If we have some notifies, loop to find one to do - while notifies: - notify = notifies.pop(0) + while self.notifies: + notify = self.notifies.pop(0) task: TaskBase | None = ( self.query_set.defer(None).defer("result").filter(message_id=notify.payload).first() )