Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Consumer gets stuck on ack #608

Open
kimdre opened this issue Dec 20, 2023 · 4 comments
Open

Consumer gets stuck on ack #608

kimdre opened this issue Dec 20, 2023 · 4 comments

Comments

@kimdre
Copy link

kimdre commented Dec 20, 2023

I'm trying to run a consumer in a docker swarm cluster.
The app consumes multiple events from a queue, writes them as bulk a database and then afterwards acknowledges them all.

For some reason, however, the consumer often gets stuck when acknowledging the messages while the connection stays open, especially when the app consumes a lot of messages at once and also consumes far less than it could.

A simple restart to fix the consumer also doesn't work, as it still won't consume any messages anymore (except for 1 to 4 messages, then it gets stuck again).
To fix it, I have to first delete the queue with all messages in it and then I can restart the consumer app.

When I try to run this exact setup in a normal docker stack on my computer without swarm enabled, it works fine for some reason.

app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.117] [utils.py.save_events:54] INFO: Committing 17 events to database.
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.131] [utils.py.save_events:57] DEBUG: Committed events to database successfully.
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.132] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00d\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.132] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00e\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.133] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00f\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.133] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00g\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.133] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00h\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.134] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00i\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.134] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00j\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.135] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00k\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.135] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00l\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.135] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00m\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.136] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00n\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.136] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00o\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.137] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00p\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.137] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00q\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.137] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00r\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.138] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00s\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:30.138] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x01\x00\x01\x00\x00\x00\r\x00<\x00P\x00\x00\x00\x00\x00\x00\x00t\x00\xce', should_close=False, drain_future=<Future pending cb=[FutureStore.__on_task_done.<locals>.remover() at /usr/local/lib/python3.12/site-packages/aiormq/base.py:33, <1 more>, Task.task_wakeup()]>)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:30:44.855] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:31:14.857] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:31:44.857] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:32:14.859] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:32:44.860] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:33:14.861] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:33:44.863] [connection.py.__writer:715] DEBUG: Prepare to send ChannelFrame(payload=b'\x08\x00\x00\x00\x00\x00\x00\xce', should_close=False, drain_future=None)
app_processor.1.zj06uz2v1zi4@worker-1    | [2023-12-20 23:33:44.863] [connection.py.__reader:668] WARNING: Server connection <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7f082d6ed4f0> was stuck. No frames were received in 183 seconds.
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.623] [connection.py.__writer:751] DEBUG: Writer exited for <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7efd0b70d540>
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.623] [connection.py._on_reader_done:536] DEBUG: Reader exited for <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7efd0b70d540>
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.624] [connection.py._on_close:783] DEBUG: Closing connection <Connection: "amqp://user:******@rabbitmq:5672/app" at 0x7efd0b70d540> cause: CancelledError()
app_processor.1.lybdbcd90psc@worker-1    | [2023-12-20 23:53:02.624] [robust_connection.py._on_connection_close:83] INFO: Connection to amqp://user:******@rabbitmq:5672/app closed. Reconnecting after 5 seconds.

The code looks like this:

if await save_events(messages):
    for message in message_list_chunk:
        await message.ack()
else:
    self.failed_messages.extend(messages)
@kimdre
Copy link
Author

kimdre commented Jan 7, 2024

Update: The issue only occurs in a docker swarm cluster. Using a normal docker or a normal rabbitmq instance works fine.

@Elli-Rid
Copy link

Hey @kimdre did you notice if messages would start build up after consumer(s) stuck? Also, if there is a specific message size limit at which point the issue persistently occurs?

@kimdre
Copy link
Author

kimdre commented Jan 31, 2024

Hi, yes the publisherstill worked so new messages were building up in the queue while the consumers didn't do anything despite being connected.
The size/amount was pretty random, I could not find a common pattern there unfortunately.

@Elli-Rid
Copy link

Elli-Rid commented Mar 18, 2024

I've seen something similar that was caused by MTU size mismatch along the networking route so once a message that was larger than 8500 bytes arrives RabbitMQ connection gets stuck forever. The solution was to configure proper peering connection as it was purely an infrastructure thing (AWS). This docs from AWS describes well what was happening: https://docs.aws.amazon.com/vpc/latest/tgw/transit-gateway-quotas.html#mtu-quotas

The MTU of a network connection is the size, in bytes, of the largest permissible packet that can be passed over the connection. The larger the MTU of a connection, the more data that can be passed in a single packet. A transit gateway supports an MTU of 8500 bytes for traffic between VPCs, AWS Direct Connect, Transit Gateway Connect, and peering attachments. Traffic over VPN connections can have an MTU of 1500 bytes.

When migrating from VPC peering to use a transit gateway, an MTU size mismatch between VPC peering and the transit gateway might result in some asymmetric traffic packets dropping. Update both VPCs at the same time to avoid jumbo packets dropping due to a size mismatch.

Packets with a size larger than 8500 bytes that arrive at the transit gateway are dropped.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants