Python_Language

Python TutorialGetting Started with PythonPython Basic SyntaxPython DatatypesPython IndentationPython Collection TypesPython Basic Input and OutputPython Built in Modules and FunctionsPython FunctionsChemPy - python packageCreating Python packagesFunctional Programming in PythonIncompatibilities moving from Python 2 to Python 3IoT Programming with Python and Raspberry PIKivy - Cross-platform Python Framework for NUI DevelopmentMutable vs Immutable (and Hashable) in PythonPyInstaller - Distributing Python CodePython *args and **kwargsPython 2to3 toolPython Abstract Base Classes (abc)Python Abstract syntax treePython Alternatives to switch statement from other languagesPython and ExcelPython Anti-PatternsPython ArcPyPython ArraysPython Asyncio ModulePython Attribute AccessPython AudioPython Binary DataPython Bitwise OperatorsPython Boolean OperatorsPython Checking Path Existence and PermissionsPython ClassesPython CLI subcommands with precise help outputPython Code blocks, execution frames, and namespacesPython Collections modulePython Comments and DocumentationPython Common PitfallsPython Commonwealth ExceptionsPython ComparisonsPython Complex mathPython concurrencyPython ConditionalsPython configparserPython Context Managers (with Statement)Python Copying dataPython CountingPython ctypesPython Data SerializationPython Data TypesPython Database AccessPython Date and TimePython Date FormattingPython DebuggingPython DecoratorsPython Defining functions with list argumentsPython DeploymentPython Deque ModulePython DescriptorPython Design PatternsPython DictionaryPython Difference between Module and PackagePython DistributionPython DjangoPython Dynamic code execution with `exec` and `eval`Python EnumPython ExceptionsPython ExponentiationPython Files & Folders I/OPython FilterPython FlaskPython Functools ModulePython Garbage CollectionPython GeneratorsPython getting start with GZipPython graph-toolPython groupby()Python hashlibPython HeapqPython Hidden FeaturesPython HTML ParsingPython HTTP ServerPython IdiomsPython ijsonPython Immutable datatypes(int, float, str, tuple and frozensets)Python Importing modulesPython Indexing and SlicingPython Input, Subset and Output External Data Files using PandasPython Introduction to RabbitMQ using AMQPStorm



Python Introduction to RabbitMQ using AMQPStorm

From WikiOD

Remarks[edit | edit source]

The latest version of AMQPStorm is available at pypi or you can install it using pip

pip install amqpstorm

How to consume messages from RabbitMQ[edit | edit source]

Start with importing the library.

from amqpstorm import Connection

When consuming messages, we first need to define a function to handle the incoming messages. This can be any callable function, and has to take a message object, or a message tuple (depending on the to_tuple parameter defined in start_consuming).

Besides processing the data from the incoming message, we will also have to Acknowledge or Reject the message. This is important, as we need to let RabbitMQ know that we properly received and processed the message.

def on_message(message):
    """This function is called on message received.

    :param message: Delivered message.
    :return:
    """
    print("Message:", message.body)

    # Acknowledge that we handled the message without any issues.
    message.ack()

    # Reject the message.
    # message.reject()

    # Reject the message, and put it back in the queue.
    # message.reject(requeue=True)

Next we need to set up the connection to the RabbitMQ server.

connection = Connection('127.0.0.1', 'guest', 'guest')

After that we need to set up a channel. Each connection can have multiple channels, and in general when performing multi-threaded tasks, it's recommended (but not required) to have one per thread.

channel = connection.channel()

Once we have our channel set up, we need to let RabbitMQ know that we want to start consuming messages. In this case we will use our previously defined on_message function to handle all our consumed messages.

The queue we will be listening to on the RabbitMQ server is going to be simple_queue, and we are also telling RabbitMQ that we will be acknowledging all incoming messages once we are done with them.

channel.basic.consume(callback=on_message, queue='simple_queue', no_ack=False)

Finally we need to start the IO loop to start processing messages delivered by the RabbitMQ server.

channel.start_consuming(to_tuple=False)

How to publish messages to RabbitMQ[edit | edit source]

Start with importing the library.

from amqpstorm import Connection
from amqpstorm import Message

Next we need to open a connection to the RabbitMQ server.

connection = Connection('127.0.0.1', 'guest', 'guest')

After that we need to set up a channel. Each connection can have multiple channels, and in general when performing multi-threaded tasks, it's recommended (but not required) to have one per thread.

channel = connection.channel()

Once we have our channel set up, we can start to prepare our message.

# Message Properties.
properties = {
    'content_type': 'text/plain',
    'headers': {'key': 'value'}
}

# Create the message.
message = Message.create(channel=channel, body='Hello World!', properties=properties)

Now we can publish the message by simply calling publish and providing a routing_key. In this case we are going to send the message to a queue called simple_queue.

message.publish(routing_key='simple_queue')

How to create a delayed queue in RabbitMQ[edit | edit source]

First we need to set up two basic channels, one for the main queue, and one for the delay queue. In my example at the end, I include a couple of additional flags that are not required, but makes the code more reliable; such as confirm delivery, delivery_mode and durable. You can find more information on these in the RabbitMQ manual.

After we have set up the channels we add a binding to the main channel that we can use to send messages from the delay channel to our main queue.

channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

Next we need to configure our delay channel to forward messages to the main queue once they have expired.

delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000,
    'x-dead-letter-exchange': 'amq.direct',
    'x-dead-letter-routing-key': 'hello'
})

x-message-ttl (Message - Time To Live)

This is normally used to automatically remove old messages in the queue after a specific duration, but by adding two optional arguments we can change this behaviour, and instead have this parameter determine in milliseconds how long messages will stay in the delay queue.

x-dead-letter-routing-key

This variable allows us to transfer the message to a different queue once they have expired, instead of the default behaviour of removing it completely.

x-dead-letter-exchange

This variable determines which Exchange used to transfer the message from hello_delay to hello queue.

Publishing to the delay queue

When we are done setting up all the basic Pika parameters you simply send a message to the delay queue using basic publish.

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mod': 2})

Once you have executed the script you should see the following queues created in your RabbitMQ management module. Introduction_to_RabbitMQ_using_AMQPStorm

Example.

from amqpstorm import Connection

connection = Connection('127.0.0.1', 'guest', 'guest')

# Create normal 'Hello World' type channel.
channel = connection.channel()
channel.confirm_deliveries()
channel.queue.declare(queue='hello', durable=True)

# We need to bind this channel to an exchange, that will be used to transfer
# messages from our delay queue.
channel.queue.bind(exchange='amq.direct', routing_key='hello', queue='hello')

# Create our delay channel.
delay_channel = connection.channel()
delay_channel.confirm_deliveries()

# This is where we declare the delay, and routing for our delay channel.
delay_channel.queue.declare(queue='hello_delay', durable=True, arguments={
    'x-message-ttl': 5000, # Delay until the message is transferred in milliseconds.
    'x-dead-letter-exchange': 'amq.direct', # Exchange used to transfer the message from A to B.
    'x-dead-letter-routing-key': 'hello' # Name of the queue we want the message transferred to.
})

delay_channel.basic.publish(exchange='',
                            routing_key='hello_delay',
                            body='test',
                            properties={'delivery_mode': 2})

print("[x] Sent")

Credit:Stack_Overflow_Documentation