In the previous story I’ve described how to install RabbitMQ on Windows.
Here I will describe how you can get the RabbitMQ with management UI.
The easiest way is to use docker image rabbitmq:3.8.9-management-alpine (I prefer distribution based on Alpine Linux). The source code for Alpine Linux can be found here https://github.com/docker-library/rabbitmq/blob/888638927482f86af6e88bebb67423926cb1112f/3.8/alpine/management/Dockerfile
It doesn’t have ability to move messages from one Queue to another (plugins rabbitmq_shovel and rabbitmq_shovel_management are not installed). The code below fixes this:
I have change only line 4:
RUN rabbitmq-plugins enable --offline rabbitmq_management rabbitmq_shovel rabbitmq_shovel_management
I have added
Let’s suppose you’ve built this docker image with name
Now, you code will be typically run in another docker file. Best practice for establishing network connection between 2 docker containers is to create network. See https://www.middlewareinventory.com/blog/docker-connect-containers-together/ for more details.
docker network create rabbitnet
Now, start docker container based on
rabbitmq-i docker image by:
docker run -d --hostname rabbitmq --name rabbitmq -p 15672:15672 -p 5672:5672 --network rabbitnet rabbitmq-i
-d — start it in detached mode (in background, without attaching the console to the process’s standard input, standard output, and standard error).
--hostname — container’s host name. It is used inside RabbitMQ. For example, as part of the name of your RabbitMQ cluster
--name — name of the docker container. It will be used to communicate with this docker container from another docker container. It is better it to be the same as
-p 15672:15672 -p 5672:5672 —
5672 is exposed to connect to RabbitMQ and port
15672 is exposed for management UI. I’m using plain TCP, so I’m exposing port
15672. If you want to use TLS you should expose port
15671. Quote from documentation:
As mentioned in the Certificates and Keys section, TLS has two primary purposes: encrypting connection traffic and providing a way to verify that the peer can be trusted (e.g. signed by a trusted Certificate Authority) to mitigate against Man-in-the-Middle attacks, a class of attacks where an attacker impersonates a legitimate trusted peer (usually a server). This section will focus on the latter.
--network rabbitnet — this is crucial part of definition, we’re connecting
rabbitmq-i to internal network (with name
Now, suppose that you have another docker container with
pika==1.1.0 installed. Than you can have following blocking message producer:
When you’re running this docker container, you should add
--network rabbitnet to
docker run instruction in order to able to communicate with
rabbitmq docker container (that is also sit on
rabbitnet, see above).
Let’s go over the code:
In line 8— In tutorial’s
URLParameters is often used.
ConnectionParameters both extends
Parameters class to be passed to it’s
__init__ method, so you can use both of them. I’m using all default value, except
host. I’m using
rabbitmq as host. Because this docker container is connected (by
rabbitnet) to docker container based on
rabbitmq-i image that has name
rabbitmq, our TCP call will reach docker container with RabbitMQ.
Note: This same code will also work with RabbitMQ installed on Windows machine. You should only modify C:\Windows\System32\drivers\etc\hosts file. Add to it the following line:
Now, you calls to “rabbitmq” hostname will reach your localhost (where you have RabbitMQ installed).
In line 10 — we’re creating simplest
BlockingConnection with connection parameters above. See Comparing Message Publishing with BlockingConnection and SelectConnection, Connecting to RabbitMQ with Callback-Passing Style, Asynchronous publisher example for alternative approaches (using
Pika). We use context manager to automatically close connection at the end.
In line 12 — we’re getting
In line 15 (not in use) — we can create Queue programmatically. Personally, I prefer to do it in management UI, but you can do it in the code also. Quote from docstring:
Declare queue, create if needed. This method creates or checks a
queue. When creating a new queue the client can specify various
properties that control the durability of the queue and its contents, and the level of sharing for the queue.
Use an empty string as the queue name for the broker to auto-generate one. Retrieve this auto-generated queue name from the returned `spec.Queue.DeclareOk` method frame.:param str queue: The queue name; if empty string, the broker will
create a unique queue name
:param bool passive: Only check to see if the queue exists and raise
`ChannelClosed` if it doesn't
:param bool durable: Survive reboots of the broker
:param bool exclusive: Only allow access by the current connection
:param bool auto_delete: Delete after consumer cancels or disconnects
:param dict arguments: Custom key/value arguments for the queue:returns: Method frame from the Queue.Declare-ok response:rtype: `pika.frame.Method` having `method` attribute of type
In line 18 — we’re turning propriety mechanism for ensuring message delivery to the RabbitMQ. For more details see https://www.rabbitmq.com/confirms.html
In lines 20–25 — we’re actually sending message to the RabbitMQ.
basic_publish method publishes a message to a specific exchange. The message will be routed to queues as defined by the exchange configuration and distributed to any active consumers when the transaction, if any, is committed.
Empty exchange name means that default exchange should be used.
routing_key — The routing key to bind on. For default exchange, queue name can be supplied, so the message will be sent to the appropriate queue. Here, we’re sending to the
body — The message body.
properties (unused) — BasicProperties of the message.
- delivery_mode=1 means TRANSIENT_DELIVERY_MODE. This means that message will not be stored on disk and it will disappear after broker restarts.
- delivery_mode=0 means PERSISTENT_DELIVERY_MODE. You should also declare queue as durable (see
queue_declareabove; you can do it also in management UI). In this case message will be stored on disk after broker restarts.
First, some background: both persistent and transient messages can be written to disk. Persistent messages will be written to disk as soon as they reach the queue, while transient messages will be written to disk only so that they can be evicted from memory while under memory pressure. Persistent messages are also kept in memory when possible and only evicted from memory under memory pressure. The “persistence layer” refers to the mechanism used to store messages of both types to disk.
mandatory — This flag tells the server how to react if the message cannot be routed to a queue. If this flag is set, the server will return an unroutable message with a Return method. If this flag is zero, the server silently drops the message.
Basically, this code uses push model: RabbitMQ “pushes” message on this code — every message from the
local queue will be processed in
Note: In PyCharm
Ctrl+C doesn't work for me, I was unable to send
SIGINT signal to interrupt
channel.start_consuming() method. Killing the process leaves connection/channel opened in RabbitMQ. Be sure, to close it manually. It is better, to run this code from the CLI.
See https://pika.readthedocs.io/en/stable/examples/blocking_consume.html for details.
Basically, this code uses pull model: it “pulls” message from the RabbitMQ. This code has full control when it wants to stop processing the messages. It also has ability to know when the queue is empty.
Note: I use
Twisted==20.3.0. You need to install some extra packages to make this code works.
I will not go through every line, but only on interesting one.
In lines 74–76 we’re putting at least 2 message to the queue (in order for it to be not empty).
In lines 12–13 we have 2 flags — finished and finished_defered. This variables are used in communications between different threads (the details will be provided below).
Note: Python stores all objects on a heap, in main memory. So you don’t need to use
volatile in Python. See https://stackoverflow.com/a/53780395/1137529 for details.
finished — queue was finished to be populated — producers put all items to the queue.
finished_defered — queue was finished to be processed — all items was consumed from the queue.
In lines 51–52
we’re starting Twisted reactor in the dedicated Thread.
In general, in Twisted application reactor should be started in the Main Thread. We’re not doing this in the main Thread we can’t install signal handers, so we’re passing
False. This is a not-standard way for doing this, because I don’t want to write Twisted application here, but I want to use Twisted event-loop to consume items from the queue.
In line 60:
this code runs
defer_pika_queue_consume(channel) method in the dedicated Thread where Twisted event-loop that is running.
In lines 62–63 (
main() function) we have
finished = True
here we modelling that producers has finish to fill the queue. We’re sending this message to
on_message callback. Now, if the queue is empty he should stop queue consumption. By default, if the queue is empty it will wait for new messages to arrive.
In lines 65–66 (
main() function) we have
while not finished_defered:
main() function will wait until
on_message() callback will send us signal that all items from queue was consumed and we can close the connection/channel to the queue.
- Here we’re using spin-lock implementation of
- This implementation is not Mutex-based (not
RLock-based). See https://stackoverflow.com/q/10236947/1137529 for the details.
- We can’t use
connection.sleep()here because we have reactor event-loop in different thread.
In line 32
for method_frame, properties, body in channel.consume(“local”, inactivity_timeout=10.0)
We’re passing inactivity_timeout in order to exit the call to
consume() method after timeout in order to check the
finished flag. If it is
Trueand the queue is empty, we should finish consuming items from the queue.
In line 39–44 — after we get
finished flag as
True we’re using
channel.get_waiting_message_count() call to check if we had processed the last item from the queue. This avoid waiting on the queue. The following code is also correct:
But this is less optimal: here we had to have at least one blocking call to break the loop.
In the provided example, if had received
finished message before last item was processed, we can never block and break the loop.