RabbitMQ¶
The shortest explanation of RabbitMQ is the AMQP 0-9-1 Model in Brief:
Multiple publishers can publish messages to exchanges
An exchange routes the messages it receives to one or more queues, using rules called bindings
Multiple consumers can subscribe to one or more queues
It is recommended to be familiar with all RabbitMQ tutorials.
Development¶
Install RabbitMQ. On macOS:
brew install rabbitmq
Enable the management plugin:
rabbitmq-plugins enable rabbitmq_management
Access the management plugin at http://127.0.0.1:15672 (user:
guest
, password:guest
)
In Python, use and contribute to yapw, our wrapper around Pika, to interact with RabbitMQ, because implementing threads, error handling, signal handling, etc. in every project is repetitive and non-trivial. That said, if you need to use Pika directly, see the examples in its documentation and on GitHub. Don’t use Celery, because its abstractions are inefficient, requiring complex workarounds.
Environment variables¶
Connect to the broker using a connection string stored in the RABBIT_URL
environment variable.
Store the exchange name in the RABBIT_EXCHANGE_NAME
environment variable - following the format {project}_{environment}
or {project}_{service}_{environment}
– and prefix routing keys by the exchange name. This makes it easy to create distinct exchanges for local development and test environments.
Design decisions¶
Bindings¶
Consumers declare and bind queues, not publishers. To reduce coupling, a publisher does not control how its messages are routed: it simply sets the routing key. Each consumer then declares its own queue to read from, and it sets the routing keys that the queue binds to. As such, any number of consumers can read a publisher’s messages; if no consumer reads the messages, they are undelivered, by design. This pattern makes it easier to re-order, add or remove consumers.
Heartbeat¶
If a consumer processes a message in the same thread as the heartbeat, the heartbeat can timeout if the processing is slow, causing the connection to RabbitMQ to drop (for Python, see Pika’s readme).
The solution is to process the message in a separate thread (see Python example), like when using yapw. Disabling heartbeats is highly discouraged.
See also
Acknowledgement timeouts, if processing is slow before acknowledgement.
Consumer prefetch¶
In an early production environment, prefetch count is set to 1, which is the most conservative option. In a mature production environment, it is set to 20, in order to scale first by using more threads before using more processes, based on this blog post.
Idempotence¶
Messages can be redelivered, and consumers must handle message redelivery gracefully. It is recommended to design consumers to be idempotent, rather than to explicitly perform deduplication.
To limit cascading redelivery – that is, where a consumer publishes messages but fails before acknowledging the received message, then receives the redelivered message and publishes messages, again – publish messages immediately before acknowledging the received message: that is, after any potential failure.
To be idempotent, make state changes as late as possible: for example, write to the database immediately before publishing any messages and acknowledging the message. The worker should be as stateless as possible. It should not make changes to its internal state that carry over between received messages, since messages can arrive in any order.
The simplest form of deduplication is to delete previously written rows before writing new rows to the database.
Database commits¶
If the consumer callback performs database operations, then all database operations before each message publication should be performed in a transaction. This ensures that, if the database operations fail and the incoming message is not acknowledged, then they have a chance to succeed when that message is redelivered, since no partial work had been committed. This guidance applies to each message publication, so that work is committed before the related message is published for further processing.
The message publication should not be within the transaction block, if using a with
statement with psycopg2 or Django. This ensures that the commit completes (e.g. without integrity errors), before a message is published for further processing.
Acknowledgements¶
Usually, a message is ack’d once processing is complete. In some cases, a message is ack’d before its processing is complete:
When processing is long: If a message is not ack’d on a channel within the acknowledgement timeout (30 minutes by default), the broker closes the channel. This might cause unexpected errors the next time the consumer uses the channel.
When processing isn’t atomic: After some initial work, a consumer might perform work and publish messages in chunks, like when implementing the Splitter pattern. If it encounters an error in one chunk, the consumer cannot easily “retry” the original message, without encountering integrity errors and publishing duplicate messages. As such, the message is ack’d after the initial work (“point-of-no-return”).
If a consumer is interrupted or fails before a message is ack’d, the broker automatically requeues the message, once either the acknowledgement timeout or the heartbeat timeout is reached, at which time the consumer is considered buggy, stuck or unavailable by the broker.
When an exception is raised:
If the error is expected to occur (e.g. an integrity error due to a duplicate message), or if there’s no consequence to ignoring the message (avoid causing a silent failure), the consumer should catch the error, write to a log, and nack the message.
Note
In Python, Pika’s basic_nack method sets
requeue=True
by default. Setrequeue=False
instead.If the error isn’t expected to occur and it’s unknown whether it can safely be ignored, the consumer can do nothing (e.g. allow the exception to be raised), in which case administrative action is required (e.g. purging the queue or changing the code).
See also
Message acknowledgment under Work Queues tutorial
Unused features¶
Topic exchanges¶
A topic exchange can be used to allow routing on multiple criteria. We don’t have a clear use case for this yet.
A topic exchange could support collection-specific queues, but priority queues appear to be a simpler way to prioritize collections.
Publisher confirms¶
It’s possible to ensure message delivery (see Python example) by using publisher confirms and setting the mandatory flag.
However, for simplicity, in Python, we’re using Pika’s BlockingConnection, which would use a “publish-and-wait” strategy for publisher confirms, which is officially discouraged, because it would wait for each message to be persisted to disk.
The cases that publisher confirms protect against are, in Python:
pika.exceptions.UnroutableError: The message can’t be routed to any queue.
pika.exceptions.NackError: An internal error occurs in the process responsible for the queue.
All these are unlikely. To ensure messages are routable, before publishing a message, we make sure a queue exists and is bound to the exchange such that the message goes to that queue.