packages/django-dramatiq-postgres: broker: remember previously fetched notifies (#16128)

This commit is contained in:
Marc 'risson' Schmitt
2025-08-12 14:57:45 +02:00
committed by GitHub
parent cf4dd24b6f
commit b68adec303
@@ -364,16 +364,15 @@ class _PostgresConsumer(Consumer):
@raise_connection_error
def __next__(self) -> MessageProxy | None:
# This method is called every second
notifies = []
# If we don't have a connection yet, fetch missed notifications from the table directly
if self._listen_connection is None:
if self._listen_connection is None and not self.notifies:
# We might miss a notification between the initial query and the first time we wait for
# notifications, it doesn't matter because we re-fetch for missed messages later on.
notifies = self._fetch_pending_notifies()
self.notifies = self._fetch_pending_notifies()
self.logger.debug(
"Found pending messages in queue",
notifies=len(notifies),
notifies=len(self.notifies),
queue=self.queue_name,
)
# Force creation of listen connection
@@ -391,15 +390,15 @@ class _PostgresConsumer(Consumer):
time.sleep(backoff_ms / 1000)
return None
if not notifies:
notifies = self._poll_for_notify()
if not self.notifies:
self.notifies += self._poll_for_notify()
if not notifies:
notifies = self._fetch_pending_notifies()
if not self.notifies:
self.notifies += self._fetch_pending_notifies()
# If we have some notifies, loop to find one to do
while notifies:
notify = notifies.pop(0)
while self.notifies:
notify = self.notifies.pop(0)
task: TaskBase | None = (
self.query_set.defer(None).defer("result").filter(message_id=notify.payload).first()
)