[Developer says] Message queues with Symfony2 and RabbitMQ

Message-Oriented Middleware (MOM)

Today's web applications are all about performing several operations effectively and quickly. As a result, being synchronous is not quite the best option. Take the example of uploading a photo to a photo-sharing application. Once the upload is done, the application compresses the image and stores it, then creates a resized thumbnail and finally notifies all your followers about your new post. In a synchronous application the user would be blocked until all these background operations are done. In order to avoid that you can implement a message-oriented middleware in your application. Of course, this implies that the front-facing operations (i.e. image uploads) are implemented in a separate part of your application to the background operations (i.e. thumbnail creation, followers notification). A message-oriented middleware (MOM) is an architecture approach that enables message exchange between distributed components of an application, allowing them to communicate asynchronously making the application more responsive. One of the most widespread MOM implementations is the AMQP messaging protocol.

RabbitMQ

RabbitMQ is a message broker that is a system which routes messages from a sender (producer) to a recipient (consumer). Here's what RabbitMQ roughly looks like within an application.

Image Missing
https://gist.githubusercontent.com/fpapadopou/0a47bc92139865fb01a556d19b7110bf/raw/5ea3f70646bce57b46f2df21ec950c1ef1cc9ae5/image.jpeg
  • The producer creates messages and delivers them to the exchange. For example, a message may contain the path of the uploaded picture.
  • An exchange decides in which queue the received message will be queued, based on the configured binding rules. Exchanges can be direct (one exchange per queue), fanout (multiple queues per exchange) or topic-based (each message is queued on a separate queue depending on its type).
  • Queues hold the messages until the consumer can process them.

RabbitMQ is reliable, robust and supports several versions of the AMQP protocol, as well as other messaging protocols too. Client implementations exist for all major operating systems and programming languages, which makes it a great option, even if you are just getting started. Using RabbitMQ with Symfony 2|3 is really easy too, thanks to the rabbitmq-bundle. Following the next steps, you can configure the bundle and create a simple producer-consumer pair that handles the upload of a photo as mentioned above.

Configuration

First, install the bundle with composer

composer require php-amqplib/rabbitmq-bundle

Then, enable the bundle in the application kernel (app/AppKernel.php file)

<?php
public function registerBundles()
{
    $bundles = array(
        ...
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

Add the rabbitmq bundle configuration to the main application configuration (app/config/config.yml file).

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost' # hostname and port of the rabbitmq server
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     true # a lazy connection avoids unnecessary connections to the broker on every request
            connection_timeout: 3
            read_write_timeout: 3
            keepalive: false
            heartbeat: 0
    producers:
        upload_picture:
            connection:       default # connects to the default connection configured above
            exchange_options: {name: 'upload-picture', type: direct} 
    consumers:
        upload_picture:
            connection:       default # connects to the default connection configured above
            exchange_options: {name: 'upload-picture', type: direct}
            queue_options:    {name: 'upload-picture'}
            callback:         callback_service # the UploadPictureConsumer defined below
Producer

The producer can be rather simple. In our case, it's the controller that handles the image upload operation. When the upload is done, it just sends a message to the exchange, indicating the path where the image was stored. One noticeable thing is that, although the producer is registered with the name upload_picture, you have to reference it usingupload_picture_producer.

<?php

class UploadController extends Controller
{
    public functin uploadAction(Request $request)
    {
        // Generate a path
        // Store the image
        $message = ['user_id' => 1235, 'image_path' => '/path/to/new/pic.png'];
        $this->get('old_sound_rabbit_mq.upload_picture_producer')->publish(json_encode($message));
    }
}
Consumer

The consumer class must implement the ConsumerInterface interface provided by the rabbitmq bundle. It must implement the execute function.

<?php
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

class UploadPictureConsumer implements ConsumerInterface
{
    public function execute(AMQPMessage $message)
    {
        // Getting here means the picture has successfully been uploaded. 
        // Now we can proceed with the rest of the operations.
        // $message will be an instance of `PhpAmqpLib\Message\AMQPMessage`.
        // The $message->body contains the data sent over RabbitMQ.

        try {
            $decodedData = json_decode($message->body, true);
            $resizedImage = resizeImage($decodedData['image_path']);
            notifyFollowers($decodedData['user_id']);
        } catch (\Exception $e) {
            // If any of the above fails due to temporary failure, return false, 
            // which will re-queue the current message.
            return false;
        }
        // Any other return value means the operation was successful and the
        // message can safely be removed from the queue.
        return true;
    }
}
How to run RabbitMQ

In order for this setup to work you need to have a running rabbitmq server. All registered consumers must be started too.

The rabbitmq server can be started by just running rabbitmq-server in the command line. As soon as the rabbit server starts, you can navigate to http://localhost:15672/ and get an overview of the queues and exchanges that are active at the moment.

The consumer must be ran through Symfony's console like

php app/console rabbitmq:consumer upload_picture

By default the consumer will process messages in an endless loop. However, you can configure several properties of the consumer's execution, such as the number of messages to be consumed before the consumer dies, or the maximum amount of memory that can be used by the consumer process. Note that both the rabbitmq-server and the consumer are blocking processes so you will have to run them through a supervisor system like supervisord. Your application should also be able to spawn new consumers in case one of them exits unexpectedly.

Conclusion

This article is just an introduction to using RabbitMQ with Symfony. A lot more parameters need to be configured before your application is ready to go live. If you feel RabbitMQ fits your needs and want to dive in, here are some useful resources: