diff --git a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py index 3d0314df06..f617064601 100644 --- a/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py +++ b/packages/django-dramatiq-postgres/django_dramatiq_postgres/broker.py @@ -23,7 +23,7 @@ from django.utils.functional import cached_property from django.utils.module_loading import import_string from dramatiq.broker import Broker, Consumer, MessageProxy from dramatiq.common import compute_backoff, current_millis, dq_name, q_name, xq_name -from dramatiq.errors import BrokerConnectionError, QueueJoinTimeout +from dramatiq.errors import BrokerConnectionError, DecodeError, QueueJoinTimeout from dramatiq.message import Message from dramatiq.middleware import Middleware from pglock.core import _cast_lock_id @@ -367,7 +367,18 @@ class _PostgresConsumer(Consumer): ) if task is None: return None - message = Message.decode(cast(bytes, task.message)) + try: + message = Message.decode(cast(bytes, task.message)) + except DecodeError: + self.logger.error( + "Failed to decode task, rejecting", queue=self.queue_name, message_id=message_id + ) + self.query_set.filter(message_id=message_id, queue_name=self.queue_name).update( + state=TaskState.REJECTED, + mtime=timezone.now(), + eta=None, + ) + return None message.options["task"] = task self.in_processing.add(str(message_id)) return message