mirror of
https://github.com/goauthentik/authentik.git
synced 2026-06-17 19:09:11 +03:00
packages/django-dramatiq-postgres/broker: avoid task processing stopping on decode error (#22110)
This commit is contained in:
committed by
GitHub
parent
e50f093685
commit
b420e4fdbd
@@ -23,7 +23,7 @@ from django.utils.functional import cached_property
|
||||
from django.utils.module_loading import import_string
|
||||
from dramatiq.broker import Broker, Consumer, MessageProxy
|
||||
from dramatiq.common import compute_backoff, current_millis, dq_name, q_name, xq_name
|
||||
from dramatiq.errors import BrokerConnectionError, QueueJoinTimeout
|
||||
from dramatiq.errors import BrokerConnectionError, DecodeError, QueueJoinTimeout
|
||||
from dramatiq.message import Message
|
||||
from dramatiq.middleware import Middleware
|
||||
from pglock.core import _cast_lock_id
|
||||
@@ -367,7 +367,18 @@ class _PostgresConsumer(Consumer):
|
||||
)
|
||||
if task is None:
|
||||
return None
|
||||
message = Message.decode(cast(bytes, task.message))
|
||||
try:
|
||||
message = Message.decode(cast(bytes, task.message))
|
||||
except DecodeError:
|
||||
self.logger.error(
|
||||
"Failed to decode task, rejecting", queue=self.queue_name, message_id=message_id
|
||||
)
|
||||
self.query_set.filter(message_id=message_id, queue_name=self.queue_name).update(
|
||||
state=TaskState.REJECTED,
|
||||
mtime=timezone.now(),
|
||||
eta=None,
|
||||
)
|
||||
return None
|
||||
message.options["task"] = task
|
||||
self.in_processing.add(str(message_id))
|
||||
return message
|
||||
|
||||
Reference in New Issue
Block a user