RabbitMQ

The shortest explanation of RabbitMQ is the AMQP 0-9-1 Model in Brief:

  • Multiple publishers can publish messages to exchanges

  • An exchange routes the messages it receives to one or more queues, using rules called bindings

  • Multiple consumers can subscribe to one or more queues

It is recommended to be familiar with all RabbitMQ tutorials.

Development

  1. Install RabbitMQ. On macOS:

    brew install rabbitmq
    
  2. Enable the management plugin:

    rabbitmq-plugins enable rabbitmq_management
    
  3. Access the management plugin at http://127.0.0.1:15672 (user: guest, password: guest)

In Python, use and contribute to yapw, our wrapper around Pika, to interact with RabbitMQ, because implementing threads, error handling, signal handling, etc. in every project is repetitive and non-trivial. That said, if you need to use Pika directly, see the examples in its documentation and on GitHub. Don’t use Celery, because its abstractions are inefficient, requiring complex workarounds.

Environment variables

Connect to the broker using a connection string stored in the RABBIT_URL environment variable.

Store the exchange name in the RABBIT_EXCHANGE_NAME environment variable - following the format {project}_{environment} or {project}_{service}_{environment} – and prefix routing keys by the exchange name. This makes it easy to create distinct exchanges for local development and test environments.

Design decisions

Bindings

Consumers declare and bind queues, not publishers. To reduce coupling, a publisher does not control how its messages are routed: it simply sets the routing key. Each consumer then declares its own queue to read from, and it sets the routing keys that the queue binds to. As such, any number of consumers can read a publisher’s messages; if no consumer reads the messages, they are undelivered, by design. This pattern makes it easier to re-order, add or remove consumers.

Heartbeat

If a consumer takes too long to process a message, the heartbeat might timeout, causing the connection to RabbitMQ to drop (for Python, see Pika’s readme and example).

Disabling heartbeats is highly discouraged. The solution is to process the message in a separate thread (see Python example), like when using yapw.

That said, from Datlab’s experience, the RabbitMQ connection can be unreliable, regardless of the connection settings.

Consumer prefetch

In an early production environment, prefetch count is set to 1, which is the most conservative option. In a mature production environment, it is set to 20, in order to scale first by using more threads before using more processes, based on this blog post.

Idempotence

Messages can be redelivered, and consumers must handle message redelivery gracefully. It is recommended to design consumers to be idempotent, rather than to explicitly perform deduplication.

To limit cascading redelivery – that is, where a consumer publishes messages but fails before acknowledging the received message, then receives the redelivered message and publishes messages, again – publish messages immediately before acknowledging the received message: that is, after any potential failure.

To be idempotent, make state changes as late as possible: for example, write to the database immediately before publishing any messages and acknowledging the message. The worker should be as stateless as possible. It should not make changes to its internal state that carry over between received messages, since messages can arrive in any order.

The simplest form of deduplication is to delete previously written rows before writing new rows to the database.

Database commits

If the consumer callback performs database operations, then all database operations before each message publication should be performed in a transaction. This ensures that, if the database operations fail and the incoming message is not acknowledged, then they have a chance to succeed when that message is redelivered, since no partial work had been committed. This guidance applies to each message publication, so that work is committed before the related message is published for further processing.

The message publication should not be within the transaction block, if using a with statement with psycopg2 or Django. This ensures that the commit completes (e.g. without integrity errors), before a message is published for further processing.

Acknowledgements

Usually, a message is ack’d once processing is complete. In some cases, a message is ack’d before its processing is complete:

  • When processing is long: If a message is not ack’d on a channel within the acknowledgement timeout (30 minutes by default), the broker closes the channel. This might cause unexpected errors the next time the consumer uses the channel.

  • When processing isn’t atomic: After some initial work, a consumer might perform work and publish messages in chunks, like when implementing the Splitter pattern. If it encounters an error in one chunk, the consumer cannot easily “retry” the original message, without encountering integrity errors and publishing duplicate messages. As such, the message is ack’d after the initial work (“point-of-no-return”).

If a consumer is interrupted or fails before a message is ack’d, the broker automatically requeues the message, once either the acknowledgement timeout or the heartbeat timeout is reached, at which time the consumer is considered buggy, stuck or unavailable by the broker.

When an exception is raised:

  • If the error is expected to occur (e.g. an integrity error due to a duplicate message), or if there’s no consequence to ignoring the message (avoid causing a silent failure), the consumer should catch the error, write to a log, and nack the message.

    Note

    In Python, Pika’s basic_nack method sets requeue=True by default. Set requeue=False instead.

  • If the error isn’t expected to occur and it’s unknown whether it can safely be ignored, the consumer can do nothing (e.g. allow the exception to be raised), in which case administrative action is required (e.g. purging the queue or changing the code).

See also

Message acknowledgment under Work Queues tutorial

Unused features

Topic exchanges

A topic exchange can be used to allow routing on multiple criteria. We don’t have a clear use case for this yet.

A topic exchange could support collection-specific queues, but priority queues appear to be a simpler way to prioritize collections.

Publisher confirms

It’s possible to ensure message delivery (see Python example) by using publisher confirms and setting the mandatory flag.

However, for simplicity, in Python, we’re using Pika’s BlockingConnection, which would use a “publish-and-wait” strategy for publisher confirms, which is officially discouraged, because it would wait for each message to be persisted to disk.

The cases that publisher confirms protect against are, in Python:

All these are unlikely. To ensure messages are routable, before publishing a message, we make sure a queue exists and is bound to the exchange such that the message goes to that queue.